У NASA є digital twin кожного космічного апарату. Коли Apollo 13 мав проблему — не на кораблі шукали рішення. На його цифровому близнюкові. Симулювали варіанти. Знайшли вихід. Врятували астронавтів.
1970-й рік. Примітивні комп'ютери. Але ідея працювала.
Сьогодні AI робить digital twins живими. Вони не просто копіюють — вони передбачають, оптимізують, вчаться. Від турбін літаків до цілих міст. За даними McKinsey, до 2025 року ринок digital twins досягне $48 мільярдів. Siemens, GE, Rolls-Royce вже економлять мільярди на predictive maintenance. І це лише початок.
Що таке Digital Twin: формальне визначення
Digital Twin — це віртуальна репліка фізичного об'єкта, процесу або системи, яка оновлюється в реальному часі на основі даних сенсорів і забезпечує bidirectional зв'язок між фізичним і віртуальним світами.
Physical Asset Digital Twin
│ │
│ sensors (real-time) │
│ ─────────────────────────►│
│ │
│ actuation commands │
│ ◄─────────────────────────│
│ │
▼ ▼
Real World Simulation + AI
Ключові компоненти архітектури:
- Physical Entity — те, що моделюємо (машина, турбіна, місто)
- Sensors & IoT — джерела даних про стан
- Data Pipeline — real-time transmission та processing
- Virtual Model — фізична симуляція + ML моделі
- Analytics Engine — AI для insights, predictions, optimization
- Actuation Layer — зворотний вплив на фізичний світ
Еволюція Digital Twins: від CAD до Autonomous AI
Level 1: Static Model (Descriptive)
- CAD/BIM модель
- Немає real-time даних
- Тільки документація та візуалізація
- Приклад: 3D модель будівлі
Level 2: Connected Twin (Informative)
- Real-time sensor data integration
- Моніторинг поточного стану
- Dashboards, alerts, notifications
- Приклад: Tesla vehicle monitoring
Level 3: Predictive Twin (Analytical)
- Machine Learning для прогнозування
- Predictive maintenance
- What-if scenario analysis
- Приклад: GE wind turbine twins
Level 4: Prescriptive Twin (Intelligent)
- AI recommends optimal actions
- Automated decision support
- Self-optimization suggestions
- Приклад: Siemens factory optimization
Level 5: Autonomous Twin (Cognitive)
- AI приймає рішення автономно
- Closed-loop control
- Self-healing, self-optimizing
- Приклад: Autonomous grid management
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
import numpy as np
import torch
import torch.nn as nn
class TwinMaturityLevel(Enum):
STATIC = 1 # CAD model only
CONNECTED = 2 # Real-time monitoring
PREDICTIVE = 3 # ML predictions
PRESCRIPTIVE = 4 # AI recommendations
AUTONOMOUS = 5 # Closed-loop control
@dataclass
class DigitalTwinConfig:
"""Конфігурація Digital Twin системи."""
name: str
physical_asset_id: str
maturity_level: TwinMaturityLevel
sensor_config: Dict[str, dict]
update_frequency_hz: float = 10.0
prediction_horizon_seconds: float = 3600.0
enable_actuation: bool = False
physics_model: Optional[str] = None
ml_models: List[str] = field(default_factory=list)
class DigitalTwinCore:
"""Ядро системи Digital Twin."""
def __init__(self, config: DigitalTwinConfig):
self.config = config
self.state = {}
self.history = []
self.models = {}
self.actuators = {}
# Initialize components based on maturity level
self._init_data_pipeline()
if config.maturity_level.value >= TwinMaturityLevel.CONNECTED.value:
self._init_real_time_sync()
if config.maturity_level.value >= TwinMaturityLevel.PREDICTIVE.value:
self._init_ml_models()
if config.maturity_level.value >= TwinMaturityLevel.AUTONOMOUS.value:
self._init_control_loop()
def _init_data_pipeline(self):
"""Ініціалізація data ingestion pipeline."""
self.data_buffer = RingBuffer(capacity=10000)
self.feature_extractor = FeatureExtractor(self.config.sensor_config)
def _init_real_time_sync(self):
"""Ініціалізація real-time синхронізації."""
self.sync_manager = RealTimeSyncManager(
update_rate=self.config.update_frequency_hz
)
def _init_ml_models(self):
"""Завантаження ML моделей."""
for model_name in self.config.ml_models:
self.models[model_name] = self._load_model(model_name)
def update(self, sensor_data: Dict[str, float]) -> Dict:
"""Оновлення стану Digital Twin."""
# Update current state
self.state.update(sensor_data)
self.history.append({
'timestamp': time.time(),
'state': sensor_data.copy()
})
# Run predictions if available
predictions = {}
if self.config.maturity_level.value >= TwinMaturityLevel.PREDICTIVE.value:
predictions = self._run_predictions(sensor_data)
# Generate recommendations
recommendations = {}
if self.config.maturity_level.value >= TwinMaturityLevel.PRESCRIPTIVE.value:
recommendations = self._generate_recommendations(
sensor_data, predictions
)
# Execute autonomous actions
actions = {}
if self.config.maturity_level.value >= TwinMaturityLevel.AUTONOMOUS.value:
actions = self._execute_control_loop(
sensor_data, predictions, recommendations
)
return {
'state': self.state,
'predictions': predictions,
'recommendations': recommendations,
'actions': actions
}
Physics-Informed Neural Networks (PINNs)
Ключова ідея: Нейронна мережа, яка знає фізику. Не просто апроксимує дані, а враховує фізичні закони як constraints.
import torch
import torch.nn as nn
from torch.autograd import grad
class PhysicsInformedNN(nn.Module):
"""Physics-Informed Neural Network для моделювання фізичних процесів."""
def __init__(
self,
input_dim: int,
hidden_dims: List[int],
output_dim: int,
activation: nn.Module = nn.Tanh()
):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
activation
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, *inputs):
x = torch.cat(inputs, dim=1)
return self.network(x)
class HeatTransferPINN(PhysicsInformedNN):
"""PINN для 2D теплопередачі."""
def __init__(self, thermal_diffusivity: float = 1.0):
super().__init__(
input_dim=3, # x, y, t
hidden_dims=[64, 64, 64, 64],
output_dim=1 # temperature
)
self.k = thermal_diffusivity
def forward(self, x, y, t):
"""Forward pass — temperature prediction."""
return super().forward(x, y, t)
def compute_derivatives(self, x, y, t):
"""Обчислення похідних для фізичного закону."""
# Enable gradient computation
x.requires_grad_(True)
y.requires_grad_(True)
t.requires_grad_(True)
u = self.forward(x, y, t)
# First derivatives
u_x = grad(u.sum(), x, create_graph=True)[0]
u_y = grad(u.sum(), y, create_graph=True)[0]
u_t = grad(u.sum(), t, create_graph=True)[0]
# Second derivatives
u_xx = grad(u_x.sum(), x, create_graph=True)[0]
u_yy = grad(u_y.sum(), y, create_graph=True)[0]
return u, u_t, u_xx, u_yy
def physics_loss(self, x, y, t):
"""PDE residual loss: Heat equation du/dt = k(d²u/dx² + d²u/dy²)."""
u, u_t, u_xx, u_yy = self.compute_derivatives(x, y, t)
# Heat equation residual
pde_residual = u_t - self.k * (u_xx + u_yy)
return torch.mean(pde_residual ** 2)
def boundary_loss(
self,
x_boundary, y_boundary, t_boundary,
u_boundary_true
):
"""Boundary conditions loss."""
u_pred = self.forward(x_boundary, y_boundary, t_boundary)
return torch.mean((u_pred - u_boundary_true) ** 2)
def initial_loss(self, x_init, y_init, t_init, u_init_true):
"""Initial conditions loss."""
u_pred = self.forward(x_init, y_init, t_init)
return torch.mean((u_pred - u_init_true) ** 2)
def total_loss(
self,
x_colloc, y_colloc, t_colloc, # Collocation points
x_bound, y_bound, t_bound, u_bound, # Boundary conditions
x_init, y_init, t_init, u_init, # Initial conditions
x_data, y_data, t_data, u_data, # Sensor data
lambda_physics: float = 1.0,
lambda_boundary: float = 10.0,
lambda_initial: float = 10.0,
lambda_data: float = 1.0
):
"""Повна loss function з усіма компонентами."""
# Physics loss
L_physics = self.physics_loss(x_colloc, y_colloc, t_colloc)
# Boundary loss
L_boundary = self.boundary_loss(x_bound, y_bound, t_bound, u_bound)
# Initial conditions loss
L_initial = self.initial_loss(x_init, y_init, t_init, u_init)
# Data loss (from sensors)
u_pred = self.forward(x_data, y_data, t_data)
L_data = torch.mean((u_pred - u_data) ** 2)
return (
lambda_physics * L_physics +
lambda_boundary * L_boundary +
lambda_initial * L_initial +
lambda_data * L_data
)
class NavierStokesPINN(PhysicsInformedNN):
"""PINN для Navier-Stokes рівнянь (fluid dynamics)."""
def __init__(self, reynolds_number: float = 100.0):
super().__init__(
input_dim=3, # x, y, t
hidden_dims=[128, 128, 128, 128, 128],
output_dim=3 # u, v, p (velocity components + pressure)
)
self.Re = reynolds_number
def forward(self, x, y, t):
output = super().forward(x, y, t)
u = output[:, 0:1] # x-velocity
v = output[:, 1:2] # y-velocity
p = output[:, 2:3] # pressure
return u, v, p
def physics_loss(self, x, y, t):
"""Navier-Stokes PDE residuals."""
x.requires_grad_(True)
y.requires_grad_(True)
t.requires_grad_(True)
u, v, p = self.forward(x, y, t)
# Compute all necessary derivatives
u_t = grad(u.sum(), t, create_graph=True)[0]
u_x = grad(u.sum(), x, create_graph=True)[0]
u_y = grad(u.sum(), y, create_graph=True)[0]
u_xx = grad(u_x.sum(), x, create_graph=True)[0]
u_yy = grad(u_y.sum(), y, create_graph=True)[0]
v_t = grad(v.sum(), t, create_graph=True)[0]
v_x = grad(v.sum(), x, create_graph=True)[0]
v_y = grad(v.sum(), y, create_graph=True)[0]
v_xx = grad(v_x.sum(), x, create_graph=True)[0]
v_yy = grad(v_y.sum(), y, create_graph=True)[0]
p_x = grad(p.sum(), x, create_graph=True)[0]
p_y = grad(p.sum(), y, create_graph=True)[0]
# Navier-Stokes residuals
# Momentum equations
f_u = u_t + u * u_x + v * u_y + p_x - (1/self.Re) * (u_xx + u_yy)
f_v = v_t + u * v_x + v * v_y + p_y - (1/self.Re) * (v_xx + v_yy)
# Continuity equation (incompressibility)
f_cont = u_x + v_y
return torch.mean(f_u**2) + torch.mean(f_v**2) + torch.mean(f_cont**2)
Surrogate Models: швидка заміна дорогих симуляцій
Проблема: CFD симуляція турбіни займає 24 години. Digital twin потребує результат за секунди.
Рішення: Neural network surrogate, який апроксимує симуляцію з 1000x прискоренням.
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
import numpy as np
class SurrogateModel(nn.Module):
"""Neural network surrogate для дорогих симуляцій."""
def __init__(
self,
input_dim: int,
output_dim: int,
hidden_dims: List[int] = [256, 256, 256]
):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Dropout(0.1)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
# Scalers for normalization
self.input_scaler = StandardScaler()
self.output_scaler = StandardScaler()
def fit_scalers(self, X: np.ndarray, y: np.ndarray):
"""Fit input/output scalers."""
self.input_scaler.fit(X)
self.output_scaler.fit(y)
def forward(self, x):
return self.network(x)
def predict(self, x: np.ndarray) -> np.ndarray:
"""Predict with inverse scaling."""
x_scaled = self.input_scaler.transform(x)
x_tensor = torch.FloatTensor(x_scaled)
with torch.no_grad():
y_scaled = self.forward(x_tensor).numpy()
return self.output_scaler.inverse_transform(y_scaled)
class CFDSurrogate:
"""Surrogate для Computational Fluid Dynamics симуляцій."""
def __init__(
self,
geometry_encoder: nn.Module,
flow_predictor: nn.Module
):
self.geometry_encoder = geometry_encoder
self.flow_predictor = flow_predictor
def encode_geometry(self, mesh_vertices: np.ndarray) -> torch.Tensor:
"""Encode 3D geometry to latent representation."""
vertices_tensor = torch.FloatTensor(mesh_vertices)
return self.geometry_encoder(vertices_tensor)
def predict_flow_field(
self,
geometry_latent: torch.Tensor,
inlet_velocity: float,
reynolds_number: float
) -> Dict[str, np.ndarray]:
"""Predict velocity and pressure fields."""
conditions = torch.tensor([[inlet_velocity, reynolds_number]])
input_features = torch.cat([geometry_latent, conditions], dim=1)
output = self.flow_predictor(input_features)
return {
'velocity_x': output[:, :1000].numpy(),
'velocity_y': output[:, 1000:2000].numpy(),
'pressure': output[:, 2000:].numpy()
}
class StructuralMechanicsSurrogate(nn.Module):
"""Surrogate для FEM (Finite Element Method) структурного аналізу."""
def __init__(
self,
num_nodes: int = 1000,
force_dim: int = 3,
displacement_dim: int = 3
):
super().__init__()
self.num_nodes = num_nodes
# Encoder для геометрії та навантаження
self.encoder = nn.Sequential(
nn.Linear(num_nodes * 3 + force_dim, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 128)
)
# Decoder для deformation field
self.decoder = nn.Sequential(
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, 512),
nn.ReLU(),
nn.Linear(512, num_nodes * displacement_dim)
)
# Stress prediction head
self.stress_head = nn.Sequential(
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, num_nodes) # von Mises stress per node
)
def forward(
self,
node_positions: torch.Tensor,
applied_force: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Predict displacements and stresses.
Args:
node_positions: [batch, num_nodes * 3] - mesh nodes
applied_force: [batch, 3] - force vector
Returns:
displacements: [batch, num_nodes * 3]
stresses: [batch, num_nodes]
"""
x = torch.cat([node_positions, applied_force], dim=1)
latent = self.encoder(x)
displacements = self.decoder(latent)
stresses = self.stress_head(latent)
return displacements, stresses
def predict_failure_probability(
self,
stresses: torch.Tensor,
yield_strength: float
) -> torch.Tensor:
"""Predict probability of failure based on stress analysis."""
# Simple approach: max stress / yield strength
max_stress = stresses.max(dim=1)[0]
safety_factor = yield_strength / max_stress
failure_prob = torch.sigmoid(1.0 - safety_factor)
return failure_prob
Anomaly Detection для Digital Twins
import torch
import torch.nn as nn
import numpy as np
from typing import Tuple
class TemporalAutoencoder(nn.Module):
"""Autoencoder для виявлення аномалій у часових рядах."""
def __init__(
self,
input_dim: int,
sequence_length: int,
latent_dim: int = 32
):
super().__init__()
self.input_dim = input_dim
self.sequence_length = sequence_length
self.latent_dim = latent_dim
# LSTM Encoder
self.encoder_lstm = nn.LSTM(
input_size=input_dim,
hidden_size=64,
num_layers=2,
batch_first=True,
bidirectional=True
)
self.encoder_fc = nn.Linear(128, latent_dim)
# LSTM Decoder
self.decoder_fc = nn.Linear(latent_dim, 128)
self.decoder_lstm = nn.LSTM(
input_size=128,
hidden_size=64,
num_layers=2,
batch_first=True
)
self.output_layer = nn.Linear(64, input_dim)
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""Encode sequence to latent representation."""
lstm_out, (h_n, c_n) = self.encoder_lstm(x)
# Use last hidden state from both directions
hidden = torch.cat([h_n[-2], h_n[-1]], dim=1)
latent = self.encoder_fc(hidden)
return latent
def decode(self, z: torch.Tensor) -> torch.Tensor:
"""Decode latent to reconstructed sequence."""
# Expand latent to sequence
hidden = self.decoder_fc(z)
hidden = hidden.unsqueeze(1).repeat(1, self.sequence_length, 1)
lstm_out, _ = self.decoder_lstm(hidden)
output = self.output_layer(lstm_out)
return output
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
z = self.encode(x)
x_recon = self.decode(z)
return x_recon, z
def reconstruction_error(self, x: torch.Tensor) -> torch.Tensor:
"""Compute reconstruction error for anomaly scoring."""
x_recon, _ = self.forward(x)
error = torch.mean((x - x_recon) ** 2, dim=(1, 2))
return error
class MultiModalAnomalyDetector:
"""Детектор аномалій для мультимодальних сенсорних даних."""
def __init__(
self,
modalities: Dict[str, int], # {modality_name: feature_dim}
sequence_length: int = 100
):
self.modalities = modalities
self.sequence_length = sequence_length
# Autoencoder per modality
self.encoders = nn.ModuleDict({
name: TemporalAutoencoder(dim, sequence_length)
for name, dim in modalities.items()
})
# Cross-modal attention
total_latent = len(modalities) * 32
self.cross_attention = nn.MultiheadAttention(
embed_dim=32,
num_heads=4
)
# Anomaly classifier
self.classifier = nn.Sequential(
nn.Linear(total_latent, 64),
nn.ReLU(),
nn.Linear(64, 1),
nn.Sigmoid()
)
# Thresholds per modality (learned during training)
self.thresholds = {}
def fit(
self,
normal_data: Dict[str, np.ndarray],
epochs: int = 100,
percentile: float = 99.0
):
"""Train on normal data to establish thresholds."""
# Train each autoencoder
for name, data in normal_data.items():
self._train_autoencoder(name, data, epochs)
# Compute reconstruction error thresholds
for name, data in normal_data.items():
errors = self._compute_errors(name, data)
self.thresholds[name] = np.percentile(errors, percentile)
def detect(
self,
data: Dict[str, np.ndarray]
) -> Dict[str, any]:
"""Detect anomalies in new data."""
anomaly_scores = {}
is_anomaly = {}
for name, values in data.items():
errors = self._compute_errors(name, values)
anomaly_scores[name] = errors
is_anomaly[name] = errors > self.thresholds[name]
# Combined score with cross-modal attention
latents = []
for name, values in data.items():
tensor = torch.FloatTensor(values).unsqueeze(0)
z = self.encoders[name].encode(tensor)
latents.append(z)
latent_stack = torch.stack(latents, dim=0)
attended, _ = self.cross_attention(latent_stack, latent_stack, latent_stack)
combined_latent = attended.flatten(start_dim=1)
combined_score = self.classifier(combined_latent).item()
return {
'per_modality_scores': anomaly_scores,
'per_modality_anomaly': is_anomaly,
'combined_score': combined_score,
'is_system_anomaly': combined_score > 0.5
}
class PredictiveMaintenanceModel:
"""Модель predictive maintenance для Digital Twin."""
def __init__(
self,
sensor_dim: int,
failure_modes: List[str]
):
self.sensor_dim = sensor_dim
self.failure_modes = failure_modes
# RUL (Remaining Useful Life) predictor
self.rul_model = nn.Sequential(
nn.LSTM(sensor_dim, 128, batch_first=True),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
# Failure mode classifier
self.failure_classifier = nn.Sequential(
nn.LSTM(sensor_dim, 128, batch_first=True),
nn.Linear(128, len(failure_modes)),
nn.Softmax(dim=1)
)
def predict_rul(self, sensor_history: torch.Tensor) -> float:
"""Predict Remaining Useful Life in hours."""
lstm_out, (h_n, _) = self.rul_model[0](sensor_history)
rul = self.rul_model[1:](h_n[-1])
return rul.item()
def predict_failure_mode(
self,
sensor_history: torch.Tensor
) -> Dict[str, float]:
"""Predict probabilities of different failure modes."""
lstm_out, (h_n, _) = self.failure_classifier[0](sensor_history)
probs = self.failure_classifier[1:](h_n[-1])
return {
mode: prob.item()
for mode, prob in zip(self.failure_modes, probs[0])
}
def generate_maintenance_schedule(
self,
rul: float,
failure_probs: Dict[str, float],
maintenance_costs: Dict[str, float],
downtime_cost_per_hour: float
) -> Dict:
"""Generate optimal maintenance schedule."""
recommendations = []
for mode, prob in failure_probs.items():
if prob > 0.3: # High risk threshold
expected_cost = (
prob * (maintenance_costs[mode] + downtime_cost_per_hour * 24) +
(1 - prob) * maintenance_costs[mode] * 0.1 # Preventive maintenance
)
recommendations.append({
'failure_mode': mode,
'probability': prob,
'recommended_action': 'immediate' if prob > 0.7 else 'scheduled',
'expected_cost': expected_cost
})
return {
'remaining_useful_life_hours': rul,
'recommendations': sorted(recommendations, key=lambda x: -x['probability']),
'next_maintenance_window': max(0, rul - 24) # 24 hours safety margin
}
Reinforcement Learning для автономного контролю
import torch
import torch.nn as nn
from typing import Tuple
import numpy as np
class DigitalTwinEnvironment:
"""Середовище для RL на базі Digital Twin."""
def __init__(self, digital_twin: DigitalTwinCore):
self.twin = digital_twin
self.state_dim = len(digital_twin.state)
self.action_dim = len(digital_twin.actuators)
def reset(self) -> np.ndarray:
"""Reset environment to initial state."""
self.twin.reset()
return self._get_state()
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
"""Execute action and return (next_state, reward, done, info)."""
# Apply action to digital twin
self.twin.apply_action(action)
# Get new state
next_state = self._get_state()
# Calculate reward
reward = self._compute_reward(next_state, action)
# Check termination conditions
done = self._check_done(next_state)
return next_state, reward, done, {}
def _get_state(self) -> np.ndarray:
return np.array(list(self.twin.state.values()))
def _compute_reward(self, state: np.ndarray, action: np.ndarray) -> float:
# Example: maximize efficiency, minimize energy, avoid failures
efficiency = state[0] # Assume first state is efficiency
energy = np.sum(np.abs(action))
safety_margin = min(state) - 0.1 # Safety threshold
reward = efficiency - 0.1 * energy + 10 * max(0, safety_margin)
return reward
def _check_done(self, state: np.ndarray) -> bool:
# Done if any safety limit exceeded
return any(state < 0) or any(state > 1)
class SACAgent(nn.Module):
"""Soft Actor-Critic agent для оптимізації Digital Twin."""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dim: int = 256
):
super().__init__()
self.action_dim = action_dim
# Actor network (policy)
self.actor = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
self.mean_head = nn.Linear(hidden_dim, action_dim)
self.log_std_head = nn.Linear(hidden_dim, action_dim)
# Twin Q-networks (critics)
self.q1 = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
self.q2 = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
# Temperature parameter
self.log_alpha = nn.Parameter(torch.tensor(0.0))
self.target_entropy = -action_dim
def get_action(
self,
state: torch.Tensor,
deterministic: bool = False
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Sample action from policy."""
features = self.actor(state)
mean = self.mean_head(features)
log_std = torch.clamp(self.log_std_head(features), -20, 2)
std = log_std.exp()
if deterministic:
action = torch.tanh(mean)
log_prob = None
else:
dist = torch.distributions.Normal(mean, std)
x = dist.rsample()
action = torch.tanh(x)
# Compute log probability with tanh correction
log_prob = dist.log_prob(x) - torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(dim=-1, keepdim=True)
return action, log_prob
def get_q_values(
self,
state: torch.Tensor,
action: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Get Q-values from both critics."""
sa = torch.cat([state, action], dim=-1)
return self.q1(sa), self.q2(sa)
class DigitalTwinController:
"""Контролер для автономного керування Digital Twin."""
def __init__(
self,
env: DigitalTwinEnvironment,
agent: SACAgent,
safety_checker: nn.Module
):
self.env = env
self.agent = agent
self.safety_checker = safety_checker
def get_safe_action(self, state: np.ndarray) -> np.ndarray:
"""Get action with safety filtering."""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
# Get action from agent
action, _ = self.agent.get_action(state_tensor, deterministic=True)
action = action.squeeze(0).numpy()
# Safety check
is_safe = self.safety_checker(state_tensor, torch.FloatTensor(action))
if not is_safe:
# Fall back to safe action
action = self._get_safe_fallback_action(state)
return action
def _get_safe_fallback_action(self, state: np.ndarray) -> np.ndarray:
"""Conservative fallback action."""
# Reduce all actuator values towards neutral
return np.zeros(self.env.action_dim) * 0.5
def run_control_loop(
self,
duration_steps: int = 1000,
log_callback = None
):
"""Run autonomous control loop."""
state = self.env.reset()
total_reward = 0
for step in range(duration_steps):
action = self.get_safe_action(state)
next_state, reward, done, info = self.env.step(action)
total_reward += reward
if log_callback:
log_callback({
'step': step,
'state': state,
'action': action,
'reward': reward,
'total_reward': total_reward
})
if done:
break
state = next_state
return total_reward
NVIDIA Omniverse Integration
from omni.isaac.core import World
from omni.isaac.core.prims import RigidPrimView
from omni.isaac.core.utils.nucleus import get_assets_root_path
import numpy as np
class OmniverseDigitalTwin:
"""Digital Twin на базі NVIDIA Omniverse."""
def __init__(
self,
usd_path: str,
physics_dt: float = 1/60
):
self.world = World(physics_dt=physics_dt)
self.usd_path = usd_path
# Load scene
self.world.scene.add_default_ground_plane()
self._load_asset()
# Real-time data interface
self.sensor_interface = None
self.actuator_interface = None
def _load_asset(self):
"""Load USD asset into Omniverse."""
from omni.isaac.core.utils.stage import add_reference_to_stage
add_reference_to_stage(
usd_path=self.usd_path,
prim_path="/World/Asset"
)
self.asset_prim = self.world.scene.get_object("/World/Asset")
def connect_sensors(self, sensor_config: Dict):
"""Connect to real sensor data sources."""
self.sensor_interface = RealSensorInterface(sensor_config)
def connect_actuators(self, actuator_config: Dict):
"""Connect to real actuators."""
self.actuator_interface = ActuatorInterface(actuator_config)
def sync_loop(self, ai_models: Dict[str, nn.Module]):
"""Main synchronization loop."""
while True:
# 1. Get sensor data from physical twin
sensor_data = self.sensor_interface.read()
# 2. Update digital twin state
self._update_state(sensor_data)
# 3. Run physics simulation
self.world.step(render=True)
# 4. Run AI analysis
predictions = self._run_ai_analysis(sensor_data, ai_models)
# 5. Check for anomalies
if predictions.get('anomaly_detected'):
self._handle_anomaly(predictions)
# 6. Optimize and send commands (if autonomous mode)
if predictions.get('optimal_action') is not None:
self.actuator_interface.write(predictions['optimal_action'])
def _update_state(self, sensor_data: Dict):
"""Update digital twin geometry and physics from sensor data."""
if 'joint_positions' in sensor_data:
self.asset_prim.set_joint_positions(sensor_data['joint_positions'])
if 'temperature' in sensor_data:
self._update_thermal_state(sensor_data['temperature'])
def _run_ai_analysis(
self,
sensor_data: Dict,
ai_models: Dict[str, nn.Module]
) -> Dict:
"""Run all AI models for analysis."""
results = {}
# Anomaly detection
if 'anomaly_detector' in ai_models:
results['anomaly_detected'] = ai_models['anomaly_detector'](sensor_data)
# Predictive maintenance
if 'maintenance_predictor' in ai_models:
results['rul'] = ai_models['maintenance_predictor'](sensor_data)
# Optimization
if 'optimizer' in ai_models:
results['optimal_action'] = ai_models['optimizer'](sensor_data)
return results
Архітектура Production-Ready Digital Twin
┌─────────────────────────────────────────────────────────────────┐
│ PHYSICAL LAYER │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Sensors → Edge Gateway → Protocol Translation → Secure TX │ │
│ │ (OPC-UA, MQTT, Modbus) │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ DATA LAYER │
│ ┌────────────┐ ┌─────────────┐ ┌────────────┐ ┌─────────────┐ │
│ │ Time-series│ │ Feature │ │ Event │ │ Data Lake │ │
│ │ InfluxDB │ │ Store │ │ Kafka │ │ S3/HDFS │ │
│ └────────────┘ └─────────────┘ └────────────┘ └─────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ AI/ML LAYER │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ Predictive │ │ Anomaly │ │ Optimization │ │
│ │ Models │ │ Detection │ │ RL Agents │ │
│ │ (PINN, RNN) │ │ (Autoencoder)│ │ (SAC, PPO) │ │
│ └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SIMULATION LAYER │
│ ┌────────────────┐ ┌────────────────┐ ┌──────────────────────┐ │
│ │ Physics Engine │ │ FEM/CFD │ │ PINN Surrogates │ │
│ │ (Omniverse) │ │ (ANSYS proxy) │ │ (1000x faster) │ │
│ └────────────────┘ └────────────────┘ └──────────────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ VISUALIZATION & CONTROL │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ 3D Web │ │ AR/VR │ │ Dashboards │ │ Control │ │
│ │ Viewer │ │ Headsets │ │ Grafana │ │ Interface │ │
│ └────────────┘ └────────────┘ └────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Практичні Use Cases
1. Manufacturing (Siemens, Bosch)
- Factory floor digital twin
- Predictive maintenance: 40% reduction in downtime
- Process optimization: 15% efficiency improvement
- Quality prediction before production
2. Energy (GE, Vestas)
- Wind turbine twins: 20% more energy capture
- Grid optimization: real-time load balancing
- Failure prediction: 2 weeks advance warning
- Asset lifecycle management
3. Healthcare (Dassault, Philips)
- Patient-specific organ models
- Surgical simulation before operation
- Drug dosage optimization
- ICU monitoring twins
4. Smart Cities (Singapore, Dubai)
- City-scale traffic optimization
- Energy grid management
- Air quality prediction
- Urban planning scenarios
Ідеї для наукових досліджень
Для бакалаврської роботи:
- Простий IoT twin з Raspberry Pi + visualization
- Predictive model для одного сенсора (temperature, vibration)
- Порівняння точності PINN vs pure ML
Для магістерської дисертації:
- Physics-informed neural network для конкретного домену
- Multi-sensor fusion з anomaly detection
- Surrogate model для CFD/FEM симуляцій
Для PhD досліджень:
- Scalable twin architectures для fleet management
- Uncertainty quantification в PINNs
- Self-calibrating digital twins
- Transfer learning між twins
Висновок
Digital Twins — це не просто buzzword Industry 4.0. Це трансформація підходу до проектування, експлуатації та оптимізації фізичних систем.
Traditional: Design → Build → Operate → Fail → Fix
Digital Twin: Design → Simulate → Build → Monitor → Predict → Optimize
Різниця — реактивний vs проактивний підхід. Чекати поломки vs передбачати її за тижні.
McKinsey оцінює: digital twins можуть знизити maintenance costs на 10-40%. Для великого заводу — мільйони доларів щороку. І це лише maintenance. Optimization, quality prediction, design acceleration — додаткові мільйони.
Якщо ви плануєте дослідження в галузі цифрових двійників, PINN-моделей чи industrial AI — команда SKP-Degree готова допомогти з формулюванням задачі, реалізацією прототипу та науковим оформленням. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy — від концепції до успішного захисту.
Ключові слова: digital twin, цифровий двійник, PINN, physics-informed neural networks, surrogate model, anomaly detection, predictive maintenance, NVIDIA Omniverse, Industry 4.0, IoT, reinforcement learning, FEM, CFD, дипломна робота, магістерська, AI-дослідження.