Технології AI Написано практикуючими розробниками

AI-Driven Digital Twins: коли фізичний світ має віртуального близнюка

Оновлено: 17 хв читання 10 переглядів

У NASA є digital twin кожного космічного апарату. Коли Apollo 13 мав проблему — не на кораблі шукали рішення. На його цифровому близнюкові. Симулювали варіанти. Знайшли вихід. Врятували астронавтів.


У NASA є digital twin кожного космічного апарату. Коли Apollo 13 мав проблему — не на кораблі шукали рішення. На його цифровому близнюкові. Симулювали варіанти. Знайшли вихід. Врятували астронавтів.

1970-й рік. Примітивні комп'ютери. Але ідея працювала.

Сьогодні AI робить digital twins живими. Вони не просто копіюють — вони передбачають, оптимізують, вчаться. Від турбін літаків до цілих міст. За даними McKinsey, до 2025 року ринок digital twins досягне $48 мільярдів. Siemens, GE, Rolls-Royce вже економлять мільярди на predictive maintenance. І це лише початок.


Що таке Digital Twin: формальне визначення

Digital Twin — це віртуальна репліка фізичного об'єкта, процесу або системи, яка оновлюється в реальному часі на основі даних сенсорів і забезпечує bidirectional зв'язок між фізичним і віртуальним світами.

Physical Asset              Digital Twin
    │                           │
    │  sensors (real-time)      │
    │ ─────────────────────────►│
    │                           │
    │  actuation commands       │
    │ ◄─────────────────────────│
    │                           │
    ▼                           ▼
Real World               Simulation + AI

Ключові компоненти архітектури:

  1. Physical Entity — те, що моделюємо (машина, турбіна, місто)
  2. Sensors & IoT — джерела даних про стан
  3. Data Pipeline — real-time transmission та processing
  4. Virtual Model — фізична симуляція + ML моделі
  5. Analytics Engine — AI для insights, predictions, optimization
  6. Actuation Layer — зворотний вплив на фізичний світ

Еволюція Digital Twins: від CAD до Autonomous AI

Level 1: Static Model (Descriptive)

  • CAD/BIM модель
  • Немає real-time даних
  • Тільки документація та візуалізація
  • Приклад: 3D модель будівлі

Level 2: Connected Twin (Informative)

  • Real-time sensor data integration
  • Моніторинг поточного стану
  • Dashboards, alerts, notifications
  • Приклад: Tesla vehicle monitoring

Level 3: Predictive Twin (Analytical)

  • Machine Learning для прогнозування
  • Predictive maintenance
  • What-if scenario analysis
  • Приклад: GE wind turbine twins

Level 4: Prescriptive Twin (Intelligent)

  • AI recommends optimal actions
  • Automated decision support
  • Self-optimization suggestions
  • Приклад: Siemens factory optimization

Level 5: Autonomous Twin (Cognitive)

  • AI приймає рішення автономно
  • Closed-loop control
  • Self-healing, self-optimizing
  • Приклад: Autonomous grid management
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
import numpy as np
import torch
import torch.nn as nn

class TwinMaturityLevel(Enum):
    STATIC = 1          # CAD model only
    CONNECTED = 2       # Real-time monitoring
    PREDICTIVE = 3      # ML predictions
    PRESCRIPTIVE = 4    # AI recommendations
    AUTONOMOUS = 5      # Closed-loop control

@dataclass
class DigitalTwinConfig:
    """Конфігурація Digital Twin системи."""
    name: str
    physical_asset_id: str
    maturity_level: TwinMaturityLevel
    sensor_config: Dict[str, dict]
    update_frequency_hz: float = 10.0
    prediction_horizon_seconds: float = 3600.0
    enable_actuation: bool = False
    physics_model: Optional[str] = None
    ml_models: List[str] = field(default_factory=list)

class DigitalTwinCore:
    """Ядро системи Digital Twin."""

    def __init__(self, config: DigitalTwinConfig):
        self.config = config
        self.state = {}
        self.history = []
        self.models = {}
        self.actuators = {}

        # Initialize components based on maturity level
        self._init_data_pipeline()

        if config.maturity_level.value >= TwinMaturityLevel.CONNECTED.value:
            self._init_real_time_sync()

        if config.maturity_level.value >= TwinMaturityLevel.PREDICTIVE.value:
            self._init_ml_models()

        if config.maturity_level.value >= TwinMaturityLevel.AUTONOMOUS.value:
            self._init_control_loop()

    def _init_data_pipeline(self):
        """Ініціалізація data ingestion pipeline."""
        self.data_buffer = RingBuffer(capacity=10000)
        self.feature_extractor = FeatureExtractor(self.config.sensor_config)

    def _init_real_time_sync(self):
        """Ініціалізація real-time синхронізації."""
        self.sync_manager = RealTimeSyncManager(
            update_rate=self.config.update_frequency_hz
        )

    def _init_ml_models(self):
        """Завантаження ML моделей."""
        for model_name in self.config.ml_models:
            self.models[model_name] = self._load_model(model_name)

    def update(self, sensor_data: Dict[str, float]) -> Dict:
        """Оновлення стану Digital Twin."""
        # Update current state
        self.state.update(sensor_data)
        self.history.append({
            'timestamp': time.time(),
            'state': sensor_data.copy()
        })

        # Run predictions if available
        predictions = {}
        if self.config.maturity_level.value >= TwinMaturityLevel.PREDICTIVE.value:
            predictions = self._run_predictions(sensor_data)

        # Generate recommendations
        recommendations = {}
        if self.config.maturity_level.value >= TwinMaturityLevel.PRESCRIPTIVE.value:
            recommendations = self._generate_recommendations(
                sensor_data, predictions
            )

        # Execute autonomous actions
        actions = {}
        if self.config.maturity_level.value >= TwinMaturityLevel.AUTONOMOUS.value:
            actions = self._execute_control_loop(
                sensor_data, predictions, recommendations
            )

        return {
            'state': self.state,
            'predictions': predictions,
            'recommendations': recommendations,
            'actions': actions
        }

Physics-Informed Neural Networks (PINNs)

Ключова ідея: Нейронна мережа, яка знає фізику. Не просто апроксимує дані, а враховує фізичні закони як constraints.

import torch
import torch.nn as nn
from torch.autograd import grad

class PhysicsInformedNN(nn.Module):
    """Physics-Informed Neural Network для моделювання фізичних процесів."""

    def __init__(
        self,
        input_dim: int,
        hidden_dims: List[int],
        output_dim: int,
        activation: nn.Module = nn.Tanh()
    ):
        super().__init__()

        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                activation
            ])
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, output_dim))

        self.network = nn.Sequential(*layers)

    def forward(self, *inputs):
        x = torch.cat(inputs, dim=1)
        return self.network(x)


class HeatTransferPINN(PhysicsInformedNN):
    """PINN для 2D теплопередачі."""

    def __init__(self, thermal_diffusivity: float = 1.0):
        super().__init__(
            input_dim=3,        # x, y, t
            hidden_dims=[64, 64, 64, 64],
            output_dim=1        # temperature
        )
        self.k = thermal_diffusivity

    def forward(self, x, y, t):
        """Forward pass — temperature prediction."""
        return super().forward(x, y, t)

    def compute_derivatives(self, x, y, t):
        """Обчислення похідних для фізичного закону."""
        # Enable gradient computation
        x.requires_grad_(True)
        y.requires_grad_(True)
        t.requires_grad_(True)

        u = self.forward(x, y, t)

        # First derivatives
        u_x = grad(u.sum(), x, create_graph=True)[0]
        u_y = grad(u.sum(), y, create_graph=True)[0]
        u_t = grad(u.sum(), t, create_graph=True)[0]

        # Second derivatives
        u_xx = grad(u_x.sum(), x, create_graph=True)[0]
        u_yy = grad(u_y.sum(), y, create_graph=True)[0]

        return u, u_t, u_xx, u_yy

    def physics_loss(self, x, y, t):
        """PDE residual loss: Heat equation du/dt = k(d²u/dx² + d²u/dy²)."""
        u, u_t, u_xx, u_yy = self.compute_derivatives(x, y, t)

        # Heat equation residual
        pde_residual = u_t - self.k * (u_xx + u_yy)

        return torch.mean(pde_residual ** 2)

    def boundary_loss(
        self,
        x_boundary, y_boundary, t_boundary,
        u_boundary_true
    ):
        """Boundary conditions loss."""
        u_pred = self.forward(x_boundary, y_boundary, t_boundary)
        return torch.mean((u_pred - u_boundary_true) ** 2)

    def initial_loss(self, x_init, y_init, t_init, u_init_true):
        """Initial conditions loss."""
        u_pred = self.forward(x_init, y_init, t_init)
        return torch.mean((u_pred - u_init_true) ** 2)

    def total_loss(
        self,
        x_colloc, y_colloc, t_colloc,           # Collocation points
        x_bound, y_bound, t_bound, u_bound,     # Boundary conditions
        x_init, y_init, t_init, u_init,         # Initial conditions
        x_data, y_data, t_data, u_data,         # Sensor data
        lambda_physics: float = 1.0,
        lambda_boundary: float = 10.0,
        lambda_initial: float = 10.0,
        lambda_data: float = 1.0
    ):
        """Повна loss function з усіма компонентами."""
        # Physics loss
        L_physics = self.physics_loss(x_colloc, y_colloc, t_colloc)

        # Boundary loss
        L_boundary = self.boundary_loss(x_bound, y_bound, t_bound, u_bound)

        # Initial conditions loss
        L_initial = self.initial_loss(x_init, y_init, t_init, u_init)

        # Data loss (from sensors)
        u_pred = self.forward(x_data, y_data, t_data)
        L_data = torch.mean((u_pred - u_data) ** 2)

        return (
            lambda_physics * L_physics +
            lambda_boundary * L_boundary +
            lambda_initial * L_initial +
            lambda_data * L_data
        )


class NavierStokesPINN(PhysicsInformedNN):
    """PINN для Navier-Stokes рівнянь (fluid dynamics)."""

    def __init__(self, reynolds_number: float = 100.0):
        super().__init__(
            input_dim=3,        # x, y, t
            hidden_dims=[128, 128, 128, 128, 128],
            output_dim=3        # u, v, p (velocity components + pressure)
        )
        self.Re = reynolds_number

    def forward(self, x, y, t):
        output = super().forward(x, y, t)
        u = output[:, 0:1]  # x-velocity
        v = output[:, 1:2]  # y-velocity
        p = output[:, 2:3]  # pressure
        return u, v, p

    def physics_loss(self, x, y, t):
        """Navier-Stokes PDE residuals."""
        x.requires_grad_(True)
        y.requires_grad_(True)
        t.requires_grad_(True)

        u, v, p = self.forward(x, y, t)

        # Compute all necessary derivatives
        u_t = grad(u.sum(), t, create_graph=True)[0]
        u_x = grad(u.sum(), x, create_graph=True)[0]
        u_y = grad(u.sum(), y, create_graph=True)[0]
        u_xx = grad(u_x.sum(), x, create_graph=True)[0]
        u_yy = grad(u_y.sum(), y, create_graph=True)[0]

        v_t = grad(v.sum(), t, create_graph=True)[0]
        v_x = grad(v.sum(), x, create_graph=True)[0]
        v_y = grad(v.sum(), y, create_graph=True)[0]
        v_xx = grad(v_x.sum(), x, create_graph=True)[0]
        v_yy = grad(v_y.sum(), y, create_graph=True)[0]

        p_x = grad(p.sum(), x, create_graph=True)[0]
        p_y = grad(p.sum(), y, create_graph=True)[0]

        # Navier-Stokes residuals
        # Momentum equations
        f_u = u_t + u * u_x + v * u_y + p_x - (1/self.Re) * (u_xx + u_yy)
        f_v = v_t + u * v_x + v * v_y + p_y - (1/self.Re) * (v_xx + v_yy)

        # Continuity equation (incompressibility)
        f_cont = u_x + v_y

        return torch.mean(f_u**2) + torch.mean(f_v**2) + torch.mean(f_cont**2)

Surrogate Models: швидка заміна дорогих симуляцій

Проблема: CFD симуляція турбіни займає 24 години. Digital twin потребує результат за секунди.

Рішення: Neural network surrogate, який апроксимує симуляцію з 1000x прискоренням.

import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
import numpy as np

class SurrogateModel(nn.Module):
    """Neural network surrogate для дорогих симуляцій."""

    def __init__(
        self,
        input_dim: int,
        output_dim: int,
        hidden_dims: List[int] = [256, 256, 256]
    ):
        super().__init__()

        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.LayerNorm(hidden_dim),
                nn.GELU(),
                nn.Dropout(0.1)
            ])
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, output_dim))

        self.network = nn.Sequential(*layers)

        # Scalers for normalization
        self.input_scaler = StandardScaler()
        self.output_scaler = StandardScaler()

    def fit_scalers(self, X: np.ndarray, y: np.ndarray):
        """Fit input/output scalers."""
        self.input_scaler.fit(X)
        self.output_scaler.fit(y)

    def forward(self, x):
        return self.network(x)

    def predict(self, x: np.ndarray) -> np.ndarray:
        """Predict with inverse scaling."""
        x_scaled = self.input_scaler.transform(x)
        x_tensor = torch.FloatTensor(x_scaled)

        with torch.no_grad():
            y_scaled = self.forward(x_tensor).numpy()

        return self.output_scaler.inverse_transform(y_scaled)


class CFDSurrogate:
    """Surrogate для Computational Fluid Dynamics симуляцій."""

    def __init__(
        self,
        geometry_encoder: nn.Module,
        flow_predictor: nn.Module
    ):
        self.geometry_encoder = geometry_encoder
        self.flow_predictor = flow_predictor

    def encode_geometry(self, mesh_vertices: np.ndarray) -> torch.Tensor:
        """Encode 3D geometry to latent representation."""
        vertices_tensor = torch.FloatTensor(mesh_vertices)
        return self.geometry_encoder(vertices_tensor)

    def predict_flow_field(
        self,
        geometry_latent: torch.Tensor,
        inlet_velocity: float,
        reynolds_number: float
    ) -> Dict[str, np.ndarray]:
        """Predict velocity and pressure fields."""
        conditions = torch.tensor([[inlet_velocity, reynolds_number]])
        input_features = torch.cat([geometry_latent, conditions], dim=1)

        output = self.flow_predictor(input_features)

        return {
            'velocity_x': output[:, :1000].numpy(),
            'velocity_y': output[:, 1000:2000].numpy(),
            'pressure': output[:, 2000:].numpy()
        }


class StructuralMechanicsSurrogate(nn.Module):
    """Surrogate для FEM (Finite Element Method) структурного аналізу."""

    def __init__(
        self,
        num_nodes: int = 1000,
        force_dim: int = 3,
        displacement_dim: int = 3
    ):
        super().__init__()
        self.num_nodes = num_nodes

        # Encoder для геометрії та навантаження
        self.encoder = nn.Sequential(
            nn.Linear(num_nodes * 3 + force_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128)
        )

        # Decoder для deformation field
        self.decoder = nn.Sequential(
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, num_nodes * displacement_dim)
        )

        # Stress prediction head
        self.stress_head = nn.Sequential(
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, num_nodes)  # von Mises stress per node
        )

    def forward(
        self,
        node_positions: torch.Tensor,
        applied_force: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Predict displacements and stresses.

        Args:
            node_positions: [batch, num_nodes * 3] - mesh nodes
            applied_force: [batch, 3] - force vector

        Returns:
            displacements: [batch, num_nodes * 3]
            stresses: [batch, num_nodes]
        """
        x = torch.cat([node_positions, applied_force], dim=1)
        latent = self.encoder(x)

        displacements = self.decoder(latent)
        stresses = self.stress_head(latent)

        return displacements, stresses

    def predict_failure_probability(
        self,
        stresses: torch.Tensor,
        yield_strength: float
    ) -> torch.Tensor:
        """Predict probability of failure based on stress analysis."""
        # Simple approach: max stress / yield strength
        max_stress = stresses.max(dim=1)[0]
        safety_factor = yield_strength / max_stress
        failure_prob = torch.sigmoid(1.0 - safety_factor)
        return failure_prob

Anomaly Detection для Digital Twins

import torch
import torch.nn as nn
import numpy as np
from typing import Tuple

class TemporalAutoencoder(nn.Module):
    """Autoencoder для виявлення аномалій у часових рядах."""

    def __init__(
        self,
        input_dim: int,
        sequence_length: int,
        latent_dim: int = 32
    ):
        super().__init__()
        self.input_dim = input_dim
        self.sequence_length = sequence_length
        self.latent_dim = latent_dim

        # LSTM Encoder
        self.encoder_lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=64,
            num_layers=2,
            batch_first=True,
            bidirectional=True
        )
        self.encoder_fc = nn.Linear(128, latent_dim)

        # LSTM Decoder
        self.decoder_fc = nn.Linear(latent_dim, 128)
        self.decoder_lstm = nn.LSTM(
            input_size=128,
            hidden_size=64,
            num_layers=2,
            batch_first=True
        )
        self.output_layer = nn.Linear(64, input_dim)

    def encode(self, x: torch.Tensor) -> torch.Tensor:
        """Encode sequence to latent representation."""
        lstm_out, (h_n, c_n) = self.encoder_lstm(x)
        # Use last hidden state from both directions
        hidden = torch.cat([h_n[-2], h_n[-1]], dim=1)
        latent = self.encoder_fc(hidden)
        return latent

    def decode(self, z: torch.Tensor) -> torch.Tensor:
        """Decode latent to reconstructed sequence."""
        # Expand latent to sequence
        hidden = self.decoder_fc(z)
        hidden = hidden.unsqueeze(1).repeat(1, self.sequence_length, 1)

        lstm_out, _ = self.decoder_lstm(hidden)
        output = self.output_layer(lstm_out)
        return output

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        z = self.encode(x)
        x_recon = self.decode(z)
        return x_recon, z

    def reconstruction_error(self, x: torch.Tensor) -> torch.Tensor:
        """Compute reconstruction error for anomaly scoring."""
        x_recon, _ = self.forward(x)
        error = torch.mean((x - x_recon) ** 2, dim=(1, 2))
        return error


class MultiModalAnomalyDetector:
    """Детектор аномалій для мультимодальних сенсорних даних."""

    def __init__(
        self,
        modalities: Dict[str, int],  # {modality_name: feature_dim}
        sequence_length: int = 100
    ):
        self.modalities = modalities
        self.sequence_length = sequence_length

        # Autoencoder per modality
        self.encoders = nn.ModuleDict({
            name: TemporalAutoencoder(dim, sequence_length)
            for name, dim in modalities.items()
        })

        # Cross-modal attention
        total_latent = len(modalities) * 32
        self.cross_attention = nn.MultiheadAttention(
            embed_dim=32,
            num_heads=4
        )

        # Anomaly classifier
        self.classifier = nn.Sequential(
            nn.Linear(total_latent, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

        # Thresholds per modality (learned during training)
        self.thresholds = {}

    def fit(
        self,
        normal_data: Dict[str, np.ndarray],
        epochs: int = 100,
        percentile: float = 99.0
    ):
        """Train on normal data to establish thresholds."""
        # Train each autoencoder
        for name, data in normal_data.items():
            self._train_autoencoder(name, data, epochs)

        # Compute reconstruction error thresholds
        for name, data in normal_data.items():
            errors = self._compute_errors(name, data)
            self.thresholds[name] = np.percentile(errors, percentile)

    def detect(
        self,
        data: Dict[str, np.ndarray]
    ) -> Dict[str, any]:
        """Detect anomalies in new data."""
        anomaly_scores = {}
        is_anomaly = {}

        for name, values in data.items():
            errors = self._compute_errors(name, values)
            anomaly_scores[name] = errors
            is_anomaly[name] = errors > self.thresholds[name]

        # Combined score with cross-modal attention
        latents = []
        for name, values in data.items():
            tensor = torch.FloatTensor(values).unsqueeze(0)
            z = self.encoders[name].encode(tensor)
            latents.append(z)

        latent_stack = torch.stack(latents, dim=0)
        attended, _ = self.cross_attention(latent_stack, latent_stack, latent_stack)
        combined_latent = attended.flatten(start_dim=1)

        combined_score = self.classifier(combined_latent).item()

        return {
            'per_modality_scores': anomaly_scores,
            'per_modality_anomaly': is_anomaly,
            'combined_score': combined_score,
            'is_system_anomaly': combined_score > 0.5
        }


class PredictiveMaintenanceModel:
    """Модель predictive maintenance для Digital Twin."""

    def __init__(
        self,
        sensor_dim: int,
        failure_modes: List[str]
    ):
        self.sensor_dim = sensor_dim
        self.failure_modes = failure_modes

        # RUL (Remaining Useful Life) predictor
        self.rul_model = nn.Sequential(
            nn.LSTM(sensor_dim, 128, batch_first=True),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

        # Failure mode classifier
        self.failure_classifier = nn.Sequential(
            nn.LSTM(sensor_dim, 128, batch_first=True),
            nn.Linear(128, len(failure_modes)),
            nn.Softmax(dim=1)
        )

    def predict_rul(self, sensor_history: torch.Tensor) -> float:
        """Predict Remaining Useful Life in hours."""
        lstm_out, (h_n, _) = self.rul_model[0](sensor_history)
        rul = self.rul_model[1:](h_n[-1])
        return rul.item()

    def predict_failure_mode(
        self,
        sensor_history: torch.Tensor
    ) -> Dict[str, float]:
        """Predict probabilities of different failure modes."""
        lstm_out, (h_n, _) = self.failure_classifier[0](sensor_history)
        probs = self.failure_classifier[1:](h_n[-1])

        return {
            mode: prob.item()
            for mode, prob in zip(self.failure_modes, probs[0])
        }

    def generate_maintenance_schedule(
        self,
        rul: float,
        failure_probs: Dict[str, float],
        maintenance_costs: Dict[str, float],
        downtime_cost_per_hour: float
    ) -> Dict:
        """Generate optimal maintenance schedule."""
        recommendations = []

        for mode, prob in failure_probs.items():
            if prob > 0.3:  # High risk threshold
                expected_cost = (
                    prob * (maintenance_costs[mode] + downtime_cost_per_hour * 24) +
                    (1 - prob) * maintenance_costs[mode] * 0.1  # Preventive maintenance
                )
                recommendations.append({
                    'failure_mode': mode,
                    'probability': prob,
                    'recommended_action': 'immediate' if prob > 0.7 else 'scheduled',
                    'expected_cost': expected_cost
                })

        return {
            'remaining_useful_life_hours': rul,
            'recommendations': sorted(recommendations, key=lambda x: -x['probability']),
            'next_maintenance_window': max(0, rul - 24)  # 24 hours safety margin
        }

Reinforcement Learning для автономного контролю

import torch
import torch.nn as nn
from typing import Tuple
import numpy as np

class DigitalTwinEnvironment:
    """Середовище для RL на базі Digital Twin."""

    def __init__(self, digital_twin: DigitalTwinCore):
        self.twin = digital_twin
        self.state_dim = len(digital_twin.state)
        self.action_dim = len(digital_twin.actuators)

    def reset(self) -> np.ndarray:
        """Reset environment to initial state."""
        self.twin.reset()
        return self._get_state()

    def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
        """Execute action and return (next_state, reward, done, info)."""
        # Apply action to digital twin
        self.twin.apply_action(action)

        # Get new state
        next_state = self._get_state()

        # Calculate reward
        reward = self._compute_reward(next_state, action)

        # Check termination conditions
        done = self._check_done(next_state)

        return next_state, reward, done, {}

    def _get_state(self) -> np.ndarray:
        return np.array(list(self.twin.state.values()))

    def _compute_reward(self, state: np.ndarray, action: np.ndarray) -> float:
        # Example: maximize efficiency, minimize energy, avoid failures
        efficiency = state[0]  # Assume first state is efficiency
        energy = np.sum(np.abs(action))
        safety_margin = min(state) - 0.1  # Safety threshold

        reward = efficiency - 0.1 * energy + 10 * max(0, safety_margin)
        return reward

    def _check_done(self, state: np.ndarray) -> bool:
        # Done if any safety limit exceeded
        return any(state < 0) or any(state > 1)


class SACAgent(nn.Module):
    """Soft Actor-Critic agent для оптимізації Digital Twin."""

    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        hidden_dim: int = 256
    ):
        super().__init__()
        self.action_dim = action_dim

        # Actor network (policy)
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        self.mean_head = nn.Linear(hidden_dim, action_dim)
        self.log_std_head = nn.Linear(hidden_dim, action_dim)

        # Twin Q-networks (critics)
        self.q1 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        self.q2 = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

        # Temperature parameter
        self.log_alpha = nn.Parameter(torch.tensor(0.0))
        self.target_entropy = -action_dim

    def get_action(
        self,
        state: torch.Tensor,
        deterministic: bool = False
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Sample action from policy."""
        features = self.actor(state)
        mean = self.mean_head(features)
        log_std = torch.clamp(self.log_std_head(features), -20, 2)
        std = log_std.exp()

        if deterministic:
            action = torch.tanh(mean)
            log_prob = None
        else:
            dist = torch.distributions.Normal(mean, std)
            x = dist.rsample()
            action = torch.tanh(x)

            # Compute log probability with tanh correction
            log_prob = dist.log_prob(x) - torch.log(1 - action.pow(2) + 1e-6)
            log_prob = log_prob.sum(dim=-1, keepdim=True)

        return action, log_prob

    def get_q_values(
        self,
        state: torch.Tensor,
        action: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Get Q-values from both critics."""
        sa = torch.cat([state, action], dim=-1)
        return self.q1(sa), self.q2(sa)


class DigitalTwinController:
    """Контролер для автономного керування Digital Twin."""

    def __init__(
        self,
        env: DigitalTwinEnvironment,
        agent: SACAgent,
        safety_checker: nn.Module
    ):
        self.env = env
        self.agent = agent
        self.safety_checker = safety_checker

    def get_safe_action(self, state: np.ndarray) -> np.ndarray:
        """Get action with safety filtering."""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        # Get action from agent
        action, _ = self.agent.get_action(state_tensor, deterministic=True)
        action = action.squeeze(0).numpy()

        # Safety check
        is_safe = self.safety_checker(state_tensor, torch.FloatTensor(action))

        if not is_safe:
            # Fall back to safe action
            action = self._get_safe_fallback_action(state)

        return action

    def _get_safe_fallback_action(self, state: np.ndarray) -> np.ndarray:
        """Conservative fallback action."""
        # Reduce all actuator values towards neutral
        return np.zeros(self.env.action_dim) * 0.5

    def run_control_loop(
        self,
        duration_steps: int = 1000,
        log_callback = None
    ):
        """Run autonomous control loop."""
        state = self.env.reset()
        total_reward = 0

        for step in range(duration_steps):
            action = self.get_safe_action(state)
            next_state, reward, done, info = self.env.step(action)

            total_reward += reward

            if log_callback:
                log_callback({
                    'step': step,
                    'state': state,
                    'action': action,
                    'reward': reward,
                    'total_reward': total_reward
                })

            if done:
                break

            state = next_state

        return total_reward

NVIDIA Omniverse Integration

from omni.isaac.core import World
from omni.isaac.core.prims import RigidPrimView
from omni.isaac.core.utils.nucleus import get_assets_root_path
import numpy as np

class OmniverseDigitalTwin:
    """Digital Twin на базі NVIDIA Omniverse."""

    def __init__(
        self,
        usd_path: str,
        physics_dt: float = 1/60
    ):
        self.world = World(physics_dt=physics_dt)
        self.usd_path = usd_path

        # Load scene
        self.world.scene.add_default_ground_plane()
        self._load_asset()

        # Real-time data interface
        self.sensor_interface = None
        self.actuator_interface = None

    def _load_asset(self):
        """Load USD asset into Omniverse."""
        from omni.isaac.core.utils.stage import add_reference_to_stage

        add_reference_to_stage(
            usd_path=self.usd_path,
            prim_path="/World/Asset"
        )

        self.asset_prim = self.world.scene.get_object("/World/Asset")

    def connect_sensors(self, sensor_config: Dict):
        """Connect to real sensor data sources."""
        self.sensor_interface = RealSensorInterface(sensor_config)

    def connect_actuators(self, actuator_config: Dict):
        """Connect to real actuators."""
        self.actuator_interface = ActuatorInterface(actuator_config)

    def sync_loop(self, ai_models: Dict[str, nn.Module]):
        """Main synchronization loop."""
        while True:
            # 1. Get sensor data from physical twin
            sensor_data = self.sensor_interface.read()

            # 2. Update digital twin state
            self._update_state(sensor_data)

            # 3. Run physics simulation
            self.world.step(render=True)

            # 4. Run AI analysis
            predictions = self._run_ai_analysis(sensor_data, ai_models)

            # 5. Check for anomalies
            if predictions.get('anomaly_detected'):
                self._handle_anomaly(predictions)

            # 6. Optimize and send commands (if autonomous mode)
            if predictions.get('optimal_action') is not None:
                self.actuator_interface.write(predictions['optimal_action'])

    def _update_state(self, sensor_data: Dict):
        """Update digital twin geometry and physics from sensor data."""
        if 'joint_positions' in sensor_data:
            self.asset_prim.set_joint_positions(sensor_data['joint_positions'])

        if 'temperature' in sensor_data:
            self._update_thermal_state(sensor_data['temperature'])

    def _run_ai_analysis(
        self,
        sensor_data: Dict,
        ai_models: Dict[str, nn.Module]
    ) -> Dict:
        """Run all AI models for analysis."""
        results = {}

        # Anomaly detection
        if 'anomaly_detector' in ai_models:
            results['anomaly_detected'] = ai_models['anomaly_detector'](sensor_data)

        # Predictive maintenance
        if 'maintenance_predictor' in ai_models:
            results['rul'] = ai_models['maintenance_predictor'](sensor_data)

        # Optimization
        if 'optimizer' in ai_models:
            results['optimal_action'] = ai_models['optimizer'](sensor_data)

        return results

Архітектура Production-Ready Digital Twin

┌─────────────────────────────────────────────────────────────────┐
│                    PHYSICAL LAYER                                │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │ Sensors → Edge Gateway → Protocol Translation → Secure TX │  │
│  │ (OPC-UA, MQTT, Modbus)                                    │  │
│  └──────────────────────────────────────────────────────────┘  │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    DATA LAYER                                    │
│  ┌────────────┐ ┌─────────────┐ ┌────────────┐ ┌─────────────┐ │
│  │ Time-series│ │ Feature     │ │ Event      │ │ Data Lake   │ │
│  │ InfluxDB   │ │ Store       │ │ Kafka      │ │ S3/HDFS     │ │
│  └────────────┘ └─────────────┘ └────────────┘ └─────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    AI/ML LAYER                                   │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │
│  │ Predictive   │ │ Anomaly      │ │ Optimization             │ │
│  │ Models       │ │ Detection    │ │ RL Agents                │ │
│  │ (PINN, RNN)  │ │ (Autoencoder)│ │ (SAC, PPO)               │ │
│  └──────────────┘ └──────────────┘ └──────────────────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    SIMULATION LAYER                              │
│  ┌────────────────┐ ┌────────────────┐ ┌──────────────────────┐ │
│  │ Physics Engine │ │ FEM/CFD        │ │ PINN Surrogates      │ │
│  │ (Omniverse)    │ │ (ANSYS proxy)  │ │ (1000x faster)       │ │
│  └────────────────┘ └────────────────┘ └──────────────────────┘ │
└─────────────────────────────────┬───────────────────────────────┘
                                  │
                                  ▼
┌─────────────────────────────────────────────────────────────────┐
│                    VISUALIZATION & CONTROL                       │
│  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│  │ 3D Web     │ │ AR/VR      │ │ Dashboards │ │ Control      │ │
│  │ Viewer     │ │ Headsets   │ │ Grafana    │ │ Interface    │ │
│  └────────────┘ └────────────┘ └────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Практичні Use Cases

1. Manufacturing (Siemens, Bosch)

  • Factory floor digital twin
  • Predictive maintenance: 40% reduction in downtime
  • Process optimization: 15% efficiency improvement
  • Quality prediction before production

2. Energy (GE, Vestas)

  • Wind turbine twins: 20% more energy capture
  • Grid optimization: real-time load balancing
  • Failure prediction: 2 weeks advance warning
  • Asset lifecycle management

3. Healthcare (Dassault, Philips)

  • Patient-specific organ models
  • Surgical simulation before operation
  • Drug dosage optimization
  • ICU monitoring twins

4. Smart Cities (Singapore, Dubai)

  • City-scale traffic optimization
  • Energy grid management
  • Air quality prediction
  • Urban planning scenarios

Ідеї для наукових досліджень

Для бакалаврської роботи:

  • Простий IoT twin з Raspberry Pi + visualization
  • Predictive model для одного сенсора (temperature, vibration)
  • Порівняння точності PINN vs pure ML

Для магістерської дисертації:

  • Physics-informed neural network для конкретного домену
  • Multi-sensor fusion з anomaly detection
  • Surrogate model для CFD/FEM симуляцій

Для PhD досліджень:

  • Scalable twin architectures для fleet management
  • Uncertainty quantification в PINNs
  • Self-calibrating digital twins
  • Transfer learning між twins

Висновок

Digital Twins — це не просто buzzword Industry 4.0. Це трансформація підходу до проектування, експлуатації та оптимізації фізичних систем.

Traditional: Design → Build → Operate → Fail → Fix

Digital Twin: Design → Simulate → Build → Monitor → Predict → Optimize

Різниця — реактивний vs проактивний підхід. Чекати поломки vs передбачати її за тижні.

McKinsey оцінює: digital twins можуть знизити maintenance costs на 10-40%. Для великого заводу — мільйони доларів щороку. І це лише maintenance. Optimization, quality prediction, design acceleration — додаткові мільйони.

Якщо ви плануєте дослідження в галузі цифрових двійників, PINN-моделей чи industrial AI — команда SKP-Degree готова допомогти з формулюванням задачі, реалізацією прототипу та науковим оформленням. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy — від концепції до успішного захисту.

Ключові слова: digital twin, цифровий двійник, PINN, physics-informed neural networks, surrogate model, anomaly detection, predictive maintenance, NVIDIA Omniverse, Industry 4.0, IoT, reinforcement learning, FEM, CFD, дипломна робота, магістерська, AI-дослідження.

Про автора

Команда SKP-Degree

Верифікований автор

Розробники та дослідники AI · Python, TensorFlow, PyTorch · Досвід у промисловій розробці

Команда SKP-Degree — професійні розробники з досвідом 7+ років у промисловій розробці. Виконали 1000+ проєктів для студентів з України, Польщі та країн Балтії.

Python Django Java ML/AI React C# / .NET JavaScript

Потрібна допомога з роботою?

Замовте курсову чи дипломну роботу з програмування. Оплата після демонстрації!

Без передоплати Відеодемонстрація Автономна робота 24/7
Написати в Telegram