Digital Twins + AI: симуляція реального світу нейромережами
Як створити віртуальну копію заводу, міста чи людського тіла та оптимізувати її за допомогою ML
У 2024 році Siemens зекономив $1.7 мільярда, оптимізуючи виробництво через Digital Twins. Tesla симулює кожен свій автомобіль у віртуальному середовищі. NASA тестує марсоходи на цифрових двійниках Марсу.
Digital Twin — це віртуальна копія фізичного об'єкта, процесу або системи, яка оновлюється в реальному часі на основі даних сенсорів. Але справжня магія починається, коли ви додаєте AI.
Чому AI + Digital Twins?
- Предиктивне обслуговування: ML передбачає поломки до їх виникнення
- Оптимізація: RL знаходить оптимальні параметри без ризику для реального обладнання
- Anomaly Detection: Нейромережі виявляють відхилення від норми
- Physics-Informed ML: Поєднання фізичних рівнянь та даних
Архітектура AI-Driven Digital Twin
Physics-Informed Neural Networks (PINN)
Традиційні нейромережі навчаються лише на даних. Але фізичні системи підкоряються законам фізики! PINN інтегрують фізичні рівняння безпосередньо у функцію втрат.
∂u/∂t = α · ∂²u/∂x²
Loss функція PINN:
L = Ldata + λ · Lphysics
де Lphysics = ||∂u/∂t - α · ∂²u/∂x²||²
Реалізація PINN для теплопровідності
import torch
import torch.nn as nn
import numpy as np
class PINN(nn.Module):
"""Physics-Informed Neural Network для рівняння теплопровідності."""
def __init__(self, layers: list):
super().__init__()
self.layers = nn.ModuleList()
for i in range(len(layers) - 1):
self.layers.append(nn.Linear(layers[i], layers[i+1]))
self.activation = nn.Tanh()
# Коефіцієнт теплопровідності (learnable)
self.alpha = nn.Parameter(torch.tensor([0.01]))
def forward(self, x, t):
"""Forward pass: (x, t) -> u(x, t)"""
inputs = torch.cat([x, t], dim=1)
for layer in self.layers[:-1]:
inputs = self.activation(layer(inputs))
return self.layers[-1](inputs)
def physics_loss(self, x, t):
"""
Обчислення фізичної втрати: порушення рівняння теплопровідності.
∂u/∂t = α · ∂²u/∂x²
"""
x.requires_grad_(True)
t.requires_grad_(True)
u = self.forward(x, t)
# Обчислення похідних через autograd
u_t = torch.autograd.grad(
u, t,
grad_outputs=torch.ones_like(u),
create_graph=True
)[0]
u_x = torch.autograd.grad(
u, x,
grad_outputs=torch.ones_like(u),
create_graph=True
)[0]
u_xx = torch.autograd.grad(
u_x, x,
grad_outputs=torch.ones_like(u_x),
create_graph=True
)[0]
# Residual: ∂u/∂t - α·∂²u/∂x² = 0
residual = u_t - self.alpha * u_xx
return torch.mean(residual ** 2)
def boundary_loss(self, x_bc, t_bc, u_bc):
"""Втрата на граничних умовах."""
u_pred = self.forward(x_bc, t_bc)
return torch.mean((u_pred - u_bc) ** 2)
def initial_loss(self, x_ic, u_ic):
"""Втрата на початкових умовах (t=0)."""
t_ic = torch.zeros_like(x_ic)
u_pred = self.forward(x_ic, t_ic)
return torch.mean((u_pred - u_ic) ** 2)
def train_pinn(
model: PINN,
x_data: torch.Tensor,
t_data: torch.Tensor,
u_data: torch.Tensor,
x_collocation: torch.Tensor,
t_collocation: torch.Tensor,
epochs: int = 10000,
lambda_physics: float = 1.0
):
"""Тренування PINN."""
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2000, gamma=0.5)
for epoch in range(epochs):
optimizer.zero_grad()
# Data loss (відповідність вимірам)
u_pred = model.forward(x_data, t_data)
loss_data = torch.mean((u_pred - u_data) ** 2)
# Physics loss (виконання рівняння)
loss_physics = model.physics_loss(x_collocation, t_collocation)
# Загальна втрата
loss = loss_data + lambda_physics * loss_physics
loss.backward()
optimizer.step()
scheduler.step()
if epoch % 1000 == 0:
print(f"Epoch {epoch}: Loss={loss.item():.6f}, "
f"Data={loss_data.item():.6f}, "
f"Physics={loss_physics.item():.6f}, "
f"α={model.alpha.item():.4f}")
return model
Сурогатні моделі для швидкої симуляції
Фізичні симуляції (CFD, FEM) можуть тривати години. Surrogate Models — нейромережі, навчені апроксимувати результати симуляцій за мілісекунди.
- CFD симуляція: 2-24 години
- FEM аналіз: 30 хв - 4 години
- Точно, але повільно
- Неможливо для real-time
- Inference: 10-100 мс
- Прискорення: 10,000-100,000×
- Real-time оптимізація
- Інтерактивні what-if сценарії
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class SurrogateModel(nn.Module):
"""
Сурогатна модель для заміни CFD симуляції.
Input: параметри системи (геометрія, boundary conditions, etc.)
Output: результат симуляції (поле температур, тиску, швидкостей)
"""
def __init__(
self,
input_dim: int,
output_dim: int,
hidden_dims: list = [256, 512, 512, 256]
):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.GELU(),
nn.Dropout(0.1)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
class ConvSurrogate(nn.Module):
"""
Сурогатна модель на основі U-Net для просторових даних.
Вхід: карта параметрів (геометрія, граничні умови)
Вихід: поле рішення (температура, тиск, etc.)
"""
def __init__(self, in_channels: int = 3, out_channels: int = 1):
super().__init__()
# Encoder
self.enc1 = self._conv_block(in_channels, 64)
self.enc2 = self._conv_block(64, 128)
self.enc3 = self._conv_block(128, 256)
self.enc4 = self._conv_block(256, 512)
self.pool = nn.MaxPool2d(2)
# Bottleneck
self.bottleneck = self._conv_block(512, 1024)
# Decoder
self.up4 = nn.ConvTranspose2d(1024, 512, 2, stride=2)
self.dec4 = self._conv_block(1024, 512)
self.up3 = nn.ConvTranspose2d(512, 256, 2, stride=2)
self.dec3 = self._conv_block(512, 256)
self.up2 = nn.ConvTranspose2d(256, 128, 2, stride=2)
self.dec2 = self._conv_block(256, 128)
self.up1 = nn.ConvTranspose2d(128, 64, 2, stride=2)
self.dec1 = self._conv_block(128, 64)
self.final = nn.Conv2d(64, out_channels, 1)
def _conv_block(self, in_ch, out_ch):
return nn.Sequential(
nn.Conv2d(in_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
nn.Conv2d(out_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True)
)
def forward(self, x):
# Encoder path
e1 = self.enc1(x)
e2 = self.enc2(self.pool(e1))
e3 = self.enc3(self.pool(e2))
e4 = self.enc4(self.pool(e3))
# Bottleneck
b = self.bottleneck(self.pool(e4))
# Decoder path з skip connections
d4 = self.dec4(torch.cat([self.up4(b), e4], dim=1))
d3 = self.dec3(torch.cat([self.up3(d4), e3], dim=1))
d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1))
d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1))
return self.final(d1)
Reinforcement Learning для оптимізації
Digital Twin — ідеальне середовище для RL. Агент може експериментувати з мільйонами конфігурацій без ризику пошкодження реального обладнання.
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from stable_baselines3 import PPO, SAC
class DigitalTwinEnv(gym.Env):
"""
Gymnasium environment для оптимізації промислового процесу
через Digital Twin.
Задача: оптимізувати параметри виробництва для мінімізації
енергоспоживання при збереженні якості продукції.
"""
def __init__(self, surrogate_model, target_quality: float = 0.95):
super().__init__()
self.surrogate = surrogate_model
self.target_quality = target_quality
# Простір дій: 4 параметри процесу
self.action_space = spaces.Box(
low=np.array([0.0, 0.0, 0.0, 0.0]),
high=np.array([1.0, 1.0, 1.0, 1.0]),
dtype=np.float32
)
# Простір спостережень: стан системи
self.observation_space = spaces.Box(
low=-np.inf,
high=np.inf,
shape=(10,),
dtype=np.float32
)
self.state = None
self.steps = 0
self.max_steps = 100
def reset(self, seed=None):
super().reset(seed=seed)
# Ініціалізація стану
self.state = np.random.uniform(-0.5, 0.5, size=(10,)).astype(np.float32)
self.steps = 0
return self.state, {}
def step(self, action):
"""Виконання дії та отримання результату від Digital Twin."""
# Симуляція через surrogate model
sim_input = np.concatenate([self.state, action])
sim_input_tensor = torch.tensor(sim_input, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
sim_output = self.surrogate(sim_input_tensor).numpy()[0]
# Розпакування результатів симуляції
quality = sim_output[0] # Якість продукції
energy = sim_output[1] # Енергоспоживання
throughput = sim_output[2] # Продуктивність
# Оновлення стану
self.state = sim_output[3:13].astype(np.float32)
self.steps += 1
# Reward function: мінімізація енергії при збереженні якості
if quality >= self.target_quality:
reward = -energy + 0.1 * throughput # Бонус за продуктивність
else:
reward = -10.0 * (self.target_quality - quality) # Штраф за якість
terminated = self.steps >= self.max_steps
truncated = False
info = {
"quality": quality,
"energy": energy,
"throughput": throughput
}
return self.state, reward, terminated, truncated, info
# Тренування RL агента
def train_optimization_agent(surrogate_model, total_timesteps: int = 100000):
"""Тренування агента для оптимізації процесу."""
env = DigitalTwinEnv(surrogate_model)
# SAC — хороший вибір для continuous control
model = SAC(
"MlpPolicy",
env,
learning_rate=3e-4,
buffer_size=100000,
batch_size=256,
tau=0.005,
gamma=0.99,
verbose=1,
tensorboard_log="./dt_optimization_logs/"
)
model.learn(total_timesteps=total_timesteps)
model.save("digital_twin_optimizer")
return model
Anomaly Detection в Digital Twins
Виявлення аномалій — критична функція для Predictive Maintenance. Модель вчиться на нормальній поведінці та виявляє відхилення.
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
class LSTMAutoencoder(nn.Module):
"""
LSTM Autoencoder для виявлення аномалій у часових рядах.
Ідея: модель навчається реконструювати нормальну поведінку.
Високий reconstruction error = аномалія.
"""
def __init__(
self,
input_dim: int,
hidden_dim: int = 64,
latent_dim: int = 32,
num_layers: int = 2
):
super().__init__()
# Encoder
self.encoder = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=0.2
)
self.encoder_fc = nn.Linear(hidden_dim, latent_dim)
# Decoder
self.decoder_fc = nn.Linear(latent_dim, hidden_dim)
self.decoder = nn.LSTM(
input_size=hidden_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=0.2
)
self.output_fc = nn.Linear(hidden_dim, input_dim)
def encode(self, x):
"""Кодування послідовності в латентний простір."""
_, (h, _) = self.encoder(x)
latent = self.encoder_fc(h[-1]) # Беремо останній hidden state
return latent
def decode(self, latent, seq_len):
"""Декодування з латентного простору."""
hidden = self.decoder_fc(latent)
hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)
decoded, _ = self.decoder(hidden)
output = self.output_fc(decoded)
return output
def forward(self, x):
latent = self.encode(x)
reconstructed = self.decode(latent, x.size(1))
return reconstructed
class AnomalyDetector:
"""Детектор аномалій на основі LSTM Autoencoder."""
def __init__(self, model: LSTMAutoencoder, threshold: float = None):
self.model = model
self.threshold = threshold
self.scaler = StandardScaler()
def fit_threshold(self, normal_data: torch.Tensor, percentile: float = 99):
"""Визначення порогу на основі нормальних даних."""
self.model.eval()
with torch.no_grad():
reconstructed = self.model(normal_data)
errors = torch.mean((normal_data - reconstructed) ** 2, dim=(1, 2))
self.threshold = np.percentile(errors.numpy(), percentile)
print(f"Threshold set at {percentile}th percentile: {self.threshold:.4f}")
def detect(self, data: torch.Tensor) -> dict:
"""
Виявлення аномалій.
Returns:
dict з ключами:
- is_anomaly: bool array
- reconstruction_error: float array
- anomaly_score: нормалізований скор (0-1)
"""
self.model.eval()
with torch.no_grad():
reconstructed = self.model(data)
errors = torch.mean((data - reconstructed) ** 2, dim=(1, 2))
errors_np = errors.numpy()
return {
"is_anomaly": errors_np > self.threshold,
"reconstruction_error": errors_np,
"anomaly_score": np.clip(errors_np / (self.threshold * 2), 0, 1)
}
# Приклад використання
def monitor_equipment(detector: AnomalyDetector, sensor_data: torch.Tensor):
"""Моніторинг обладнання в реальному часі."""
results = detector.detect(sensor_data)
if np.any(results["is_anomaly"]):
anomaly_indices = np.where(results["is_anomaly"])[0]
max_score = np.max(results["anomaly_score"][anomaly_indices])
if max_score > 0.8:
return {"status": "CRITICAL", "action": "IMMEDIATE_SHUTDOWN"}
elif max_score > 0.5:
return {"status": "WARNING", "action": "SCHEDULE_MAINTENANCE"}
else:
return {"status": "MINOR", "action": "LOG_FOR_REVIEW"}
return {"status": "NORMAL", "action": None}
NVIDIA Omniverse: промисловий стандарт
NVIDIA Omniverse — платформа для створення фотореалістичних, фізично точних Digital Twins. Використовується BMW, Siemens, Amazon Robotics.
- Real-time ray tracing (RTX)
- PhysX симуляція фізики
- USD формат для 3D
- Isaac Sim для робототехніки
- Audio2Face для аватарів
- Інтеграція з AI/ML
- BMW: симуляція заводів
- Siemens: енергетичні системи
- Amazon: оптимізація складів
- Ericsson: 5G мережі
- Lockheed: космічні апарати
# Приклад роботи з Omniverse Isaac Sim
from omni.isaac.core import World
from omni.isaac.core.robots import Robot
from omni.isaac.sensor import Camera, ContactSensor
import numpy as np
class DigitalTwinFactory:
"""Digital Twin заводу в NVIDIA Omniverse."""
def __init__(self, usd_path: str):
# Ініціалізація світу
self.world = World(stage_units_in_meters=1.0)
# Завантаження 3D моделі заводу
self.world.scene.add_default_ground_plane()
self._load_factory_model(usd_path)
# Додавання сенсорів
self.cameras = []
self.contact_sensors = []
def _load_factory_model(self, usd_path: str):
"""Завантаження USD моделі заводу."""
from pxr import Usd
stage = Usd.Stage.Open(usd_path)
# Імпорт геометрії та матеріалів
...
def add_robot(
self,
robot_type: str,
position: tuple,
name: str
) -> Robot:
"""Додавання робота до Digital Twin."""
robot = self.world.scene.add(
Robot(
prim_path=f"/World/Robots/{name}",
name=name,
position=np.array(position)
)
)
return robot
def add_camera(self, position: tuple, orientation: tuple) -> Camera:
"""Додавання камери для візуального контролю."""
camera = Camera(
prim_path=f"/World/Cameras/cam_{len(self.cameras)}",
position=np.array(position),
orientation=np.array(orientation),
resolution=(1920, 1080)
)
self.cameras.append(camera)
return camera
def run_simulation(self, steps: int = 1000):
"""Запуск симуляції."""
self.world.reset()
for i in range(steps):
self.world.step(render=True)
# Збір даних з сенсорів
sensor_data = self._collect_sensor_data()
# Виявлення аномалій
anomalies = self._detect_anomalies(sensor_data)
if anomalies:
self._handle_anomalies(anomalies)
def _collect_sensor_data(self) -> dict:
"""Збір даних з усіх сенсорів."""
data = {}
for i, camera in enumerate(self.cameras):
data[f"camera_{i}"] = camera.get_rgba()
for i, sensor in enumerate(self.contact_sensors):
data[f"contact_{i}"] = sensor.get_current_frame()
return data
Потрібна допомога з проектом?
Digital Twins з AI -- інноваційна тема на стику фізичного моделювання та нейронних мереж. Ми допоможемо реалізувати проект з PINN або сурогатними моделями.
Замовити ML проектІдеї для курсової роботи
Зміст: Реалізація Physics-Informed Neural Network для рівнянь конвективного теплообміну, порівняння з FEM.
Технології: Python, PyTorch, DeepXDE
Складність: Середня
Зміст: Система виявлення аномалій для промислового обладнання на основі даних сенсорів.
Технології: Python, PyTorch, NASA Bearing Dataset
Складність: Помірна
Зміст: Digital Twin будівлі + RL агент для оптимізації HVAC системи.
Технології: Python, Stable-Baselines3, EnergyPlus
Складність: Висока
Потрібна допомога з курсовою?
Digital Twins + AI — ідеальна тема для сучасної курсової. Ми допоможемо з реалізацією PINN, surrogate models, RL оптимізацією та оформленням.
Замовити курсову з Digital Twins