Технології AI Написано практикуючими розробниками

6G + Edge AI: мережа, яка думає на швидкості світла

Оновлено: 18 хв читання 11 переглядів

5G обіцяв 1ms latency. В теорії. На практиці — 10-30ms до cloud. Для autonomous driving, де рішення потрібне за 1ms — це занадто повільно.


5G обіцяв 1ms latency. В теорії. На практиці — 10-30ms до cloud. Для autonomous driving, де рішення потрібне за 1ms — це занадто повільно.

Рішення? Не чекати на 6G. Принести AI ближче до edge. Обчислювати там, де дані народжуються.

6G edge networks — це не просто швидший інтернет. Це розподілена інтелектуальна система. Network, який думає. Базові станції, які виконують inference. Латентність, яка нижча за людську реакцію. Це architectural shift, який визначить наступне десятиліття технологій.


5G vs 6G: еволюція чи революція?

5G (поточний, 2020+):

  • Latency: 1-10ms (radio interface) + 10-50ms (до cloud)
  • Peak rate: 20 Gbps теоретично, 1-2 Gbps практично
  • Edge computing: опціонально, окремо від мережі
  • AI: в cloud, не інтегрований

6G (прогноз 2030+):

  • Latency: < 0.1ms (air interface), < 1ms end-to-end
  • Peak rate: 1 Tbps (терабіт на секунду)
  • AI-native architecture (AI не опція, а фундамент)
  • Integrated Sensing and Communication (ISAC)
  • Semantic communication
  • Terahertz spectrum (300 GHz - 3 THz)

Ключова різниця: 5G — це faster pipes. 6G — це intelligent fabric.

5G Architecture:
Device ─────── Base Station ─────── Core Network ─────── Cloud
              (dumb relay)          (routing)           (intelligence)

6G Architecture:
Device ──┬──── Intelligent BS ──┬── Distributed Core ──┬── Cloud
   AI ◄──┘         AI ◄─────────┘        AI ◄──────────┘    AI
         Edge Inference    Regional Intelligence    Global Models

Проблема: чому Cloud занадто далеко

import numpy as np
from dataclasses import dataclass
from typing import Dict, Tuple
from enum import Enum

class LatencyComponent(Enum):
    RADIO_ACCESS = "Radio Access Network"
    BACKHAUL = "Backhaul Network"
    CORE_NETWORK = "Core Network"
    INTERNET = "Internet Transit"
    CLOUD_PROCESSING = "Cloud Processing"
    RETURN_PATH = "Return Path"

@dataclass
class LatencyBreakdown:
    """Розбивка затримок для різних архітектур."""
    components: Dict[LatencyComponent, float]  # ms

    @property
    def total(self) -> float:
        return sum(self.components.values())

    def meets_requirement(self, max_latency_ms: float) -> bool:
        return self.total <= max_latency_ms


class LatencyAnalyzer:
    """Аналіз затримок для різних use cases."""

    # Use case latency requirements (ms)
    REQUIREMENTS = {
        'ar_vr': 20.0,           # Motion sickness threshold
        'autonomous_driving': 10.0,  # Safety critical
        'industrial_control': 1.0,    # Real-time PLC
        'gaming': 50.0,          # Playable threshold
        'video_call': 150.0,     # Acceptable quality
        'voice_call': 100.0,     # Toll quality
        'haptic_feedback': 5.0,  # Tactile perception
    }

    @staticmethod
    def cloud_centric_latency() -> LatencyBreakdown:
        """Типова cloud-centric архітектура."""
        return LatencyBreakdown({
            LatencyComponent.RADIO_ACCESS: 5.0,      # 5ms RAN
            LatencyComponent.BACKHAUL: 5.0,          # 5ms backhaul
            LatencyComponent.CORE_NETWORK: 10.0,     # 10ms core
            LatencyComponent.INTERNET: 20.0,         # 20ms transit
            LatencyComponent.CLOUD_PROCESSING: 15.0, # 15ms inference
            LatencyComponent.RETURN_PATH: 40.0,      # similar return
        })  # Total: ~95ms

    @staticmethod
    def edge_optimized_latency() -> LatencyBreakdown:
        """Edge-optimized архітектура."""
        return LatencyBreakdown({
            LatencyComponent.RADIO_ACCESS: 2.0,      # 2ms optimized RAN
            LatencyComponent.BACKHAUL: 0.0,          # No backhaul (local)
            LatencyComponent.CORE_NETWORK: 0.0,      # Bypassed
            LatencyComponent.INTERNET: 0.0,          # Not needed
            LatencyComponent.CLOUD_PROCESSING: 3.0,  # 3ms edge inference
            LatencyComponent.RETURN_PATH: 2.0,       # Fast return
        })  # Total: ~7ms

    @staticmethod
    def analyze_feasibility():
        """Аналіз можливості виконання вимог."""
        cloud = LatencyAnalyzer.cloud_centric_latency()
        edge = LatencyAnalyzer.edge_optimized_latency()

        print("Latency Analysis:")
        print(f"  Cloud-centric total: {cloud.total:.1f}ms")
        print(f"  Edge-optimized total: {edge.total:.1f}ms")
        print()

        print("Use Case Feasibility:")
        for use_case, requirement in LatencyAnalyzer.REQUIREMENTS.items():
            cloud_ok = "✓" if cloud.meets_requirement(requirement) else "✗"
            edge_ok = "✓" if edge.meets_requirement(requirement) else "✗"
            print(f"  {use_case:25} ({requirement:5.1f}ms): "
                  f"Cloud {cloud_ok} | Edge {edge_ok}")

# Результат:
# autonomous_driving (10.0ms): Cloud ✗ | Edge ✓
# haptic_feedback   ( 5.0ms): Cloud ✗ | Edge ✓
# ar_vr             (20.0ms): Cloud ✗ | Edge ✓

Multi-Tier Edge Architecture

from abc import ABC, abstractmethod
import torch
import torch.nn as nn
from typing import List, Optional, Tuple

@dataclass
class ComputeTier:
    """Характеристики обчислювального рівня."""
    name: str
    latency_ms: float          # Network latency to reach
    compute_capacity_tops: float  # Tera Operations Per Second
    memory_gb: float
    power_budget_watts: float
    models_supported: List[str]

class EdgeComputeHierarchy:
    """Ієрархія edge computing."""

    def __init__(self):
        self.tiers = {
            'device': ComputeTier(
                name="On-Device",
                latency_ms=0.0,
                compute_capacity_tops=5.0,      # Mobile NPU
                memory_gb=4.0,
                power_budget_watts=3.0,
                models_supported=['tiny', 'mobile']
            ),
            'base_station': ComputeTier(
                name="Base Station Edge",
                latency_ms=2.0,
                compute_capacity_tops=100.0,    # Edge GPU
                memory_gb=32.0,
                power_budget_watts=200.0,
                models_supported=['tiny', 'mobile', 'medium']
            ),
            'regional': ComputeTier(
                name="Regional Edge",
                latency_ms=10.0,
                compute_capacity_tops=500.0,    # Multiple GPUs
                memory_gb=256.0,
                power_budget_watts=2000.0,
                models_supported=['tiny', 'mobile', 'medium', 'large']
            ),
            'cloud': ComputeTier(
                name="Cloud",
                latency_ms=50.0,
                compute_capacity_tops=10000.0,  # GPU cluster
                memory_gb=1024.0,
                power_budget_watts=50000.0,
                models_supported=['tiny', 'mobile', 'medium', 'large', 'foundation']
            )
        }

    def select_optimal_tier(
        self,
        latency_requirement_ms: float,
        model_size: str,
        power_available_watts: float
    ) -> str:
        """Вибір оптимального рівня для inference."""
        for tier_name, tier in self.tiers.items():
            if (tier.latency_ms <= latency_requirement_ms and
                model_size in tier.models_supported and
                tier.power_budget_watts <= power_available_watts * 100):  # Power constraint for mobile
                return tier_name
        return 'cloud'  # Fallback


class DynamicTaskOffloader:
    """Динамічний offloading задач між рівнями."""

    def __init__(self, hierarchy: EdgeComputeHierarchy):
        self.hierarchy = hierarchy
        self.task_queue = []
        self.metrics_history = []

    def offload_decision(
        self,
        task_complexity: float,  # 0-1, relative complexity
        deadline_ms: float,
        current_network_latency: Dict[str, float],
        current_device_load: float
    ) -> Tuple[str, float]:
        """
        Рішення про offloading з урахуванням поточних умов.

        Returns:
            Tuple[tier_name, expected_latency]
        """
        decisions = []

        for tier_name, tier in self.hierarchy.tiers.items():
            # Estimate processing time based on complexity and compute power
            processing_time = task_complexity * 100 / tier.compute_capacity_tops

            # Actual network latency (may differ from nominal)
            network_latency = current_network_latency.get(tier_name, tier.latency_ms)

            # Total expected latency
            total_latency = network_latency + processing_time

            # Can we meet deadline?
            if total_latency <= deadline_ms:
                decisions.append((tier_name, total_latency, tier.power_budget_watts))

        if not decisions:
            # No tier can meet deadline, pick fastest
            return ('device', self._estimate_device_latency(task_complexity))

        # Sort by latency, then by power efficiency
        decisions.sort(key=lambda x: (x[1], x[2]))
        return decisions[0][0], decisions[0][1]

    def split_computation(
        self,
        model: nn.Module,
        input_data: torch.Tensor,
        split_point: int
    ) -> Tuple[torch.Tensor, List[str]]:
        """
        Split model execution across tiers.

        Early layers on device, later layers on edge.
        """
        layers = list(model.children())

        # Execute early layers on device
        x = input_data
        device_layers = layers[:split_point]
        for layer in device_layers:
            x = layer(x)

        # x is now intermediate features to send to edge
        intermediate_features = x

        return intermediate_features, [l.__class__.__name__ for l in device_layers]

Semantic Communication: передача смислу, не бітів

Революційна ідея 6G: Замість передачі всіх пікселів зображення, передаємо тільки semantic features. Receiver реконструює зображення.

import torch
import torch.nn as nn
import torch.nn.functional as F

class SemanticEncoder(nn.Module):
    """Encoder для semantic communication."""

    def __init__(
        self,
        input_channels: int = 3,
        semantic_dim: int = 128,
        compression_ratio: float = 0.01  # 100x compression!
    ):
        super().__init__()
        self.compression_ratio = compression_ratio

        # CNN encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 64, 3, 2, 1),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, 2, 1),
            nn.ReLU(),
            nn.Conv2d(128, 256, 3, 2, 1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((4, 4)),
            nn.Flatten()
        )

        self.semantic_projector = nn.Linear(256 * 16, semantic_dim)

        # Channel coding for transmission
        self.channel_encoder = nn.Sequential(
            nn.Linear(semantic_dim, semantic_dim * 2),
            nn.Tanh()  # Bounded output for transmission
        )

    def forward(self, image: torch.Tensor) -> torch.Tensor:
        """Encode image to semantic representation."""
        features = self.encoder(image)
        semantic = self.semantic_projector(features)
        channel_symbols = self.channel_encoder(semantic)
        return channel_symbols


class SemanticDecoder(nn.Module):
    """Decoder для semantic communication."""

    def __init__(
        self,
        semantic_dim: int = 128,
        output_channels: int = 3,
        output_size: Tuple[int, int] = (256, 256)
    ):
        super().__init__()
        self.output_size = output_size

        # Channel decoding
        self.channel_decoder = nn.Sequential(
            nn.Linear(semantic_dim * 2, semantic_dim),
            nn.ReLU()
        )

        # Semantic to features
        self.semantic_decoder = nn.Linear(semantic_dim, 256 * 16)

        # CNN decoder (generator)
        self.decoder = nn.Sequential(
            nn.Unflatten(1, (256, 4, 4)),
            nn.ConvTranspose2d(256, 128, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, 4, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, output_channels, 4, 2, 1),
            nn.Sigmoid()
        )

    def forward(self, channel_symbols: torch.Tensor) -> torch.Tensor:
        """Decode semantic representation back to image."""
        semantic = self.channel_decoder(channel_symbols)
        features = self.semantic_decoder(semantic)
        image = self.decoder(features)
        return F.interpolate(image, self.output_size)


class SemanticCommunicationSystem:
    """End-to-end semantic communication system."""

    def __init__(self, semantic_dim: int = 128):
        self.encoder = SemanticEncoder(semantic_dim=semantic_dim)
        self.decoder = SemanticDecoder(semantic_dim=semantic_dim)
        self.snr_db = 10.0  # Signal-to-noise ratio

    def simulate_channel(
        self,
        symbols: torch.Tensor,
        snr_db: float
    ) -> torch.Tensor:
        """Simulate wireless channel with AWGN noise."""
        # Calculate noise power
        signal_power = torch.mean(symbols ** 2)
        snr_linear = 10 ** (snr_db / 10)
        noise_power = signal_power / snr_linear

        # Add Gaussian noise
        noise = torch.randn_like(symbols) * torch.sqrt(noise_power)
        received = symbols + noise

        return received

    def transmit(
        self,
        image: torch.Tensor,
        snr_db: float = None
    ) -> Tuple[torch.Tensor, dict]:
        """Full transmission pipeline."""
        if snr_db is None:
            snr_db = self.snr_db

        # Encode
        symbols = self.encoder(image)

        # Transmit through noisy channel
        received_symbols = self.simulate_channel(symbols, snr_db)

        # Decode
        reconstructed = self.decoder(received_symbols)

        # Calculate metrics
        original_bits = image.numel() * 8  # Assuming 8 bits per pixel
        transmitted_bits = symbols.numel() * 32  # Float32 symbols

        metrics = {
            'compression_ratio': original_bits / transmitted_bits,
            'psnr': self._calculate_psnr(image, reconstructed),
            'ssim': self._calculate_ssim(image, reconstructed),
            'bandwidth_saved': 1 - (transmitted_bits / original_bits)
        }

        return reconstructed, metrics

    def _calculate_psnr(self, original: torch.Tensor, reconstructed: torch.Tensor) -> float:
        mse = F.mse_loss(original, reconstructed)
        if mse == 0:
            return float('inf')
        return 10 * torch.log10(1.0 / mse).item()


class TextSemanticCommunication(nn.Module):
    """Semantic communication для тексту."""

    def __init__(
        self,
        vocab_size: int = 50000,
        embedding_dim: int = 256,
        semantic_dim: int = 64
    ):
        super().__init__()

        # Text encoder (lightweight transformer)
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embedding_dim,
            nhead=4,
            dim_feedforward=512,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)

        # Semantic compression
        self.semantic_projector = nn.Linear(embedding_dim, semantic_dim)

        # Decoder
        self.semantic_expander = nn.Linear(semantic_dim, embedding_dim)
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=embedding_dim,
            nhead=4,
            dim_feedforward=512,
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=2)
        self.output_projection = nn.Linear(embedding_dim, vocab_size)

    def encode(self, text_tokens: torch.Tensor) -> torch.Tensor:
        """Encode text to semantic representation."""
        embedded = self.embedding(text_tokens)
        encoded = self.encoder(embedded)
        # Global pooling to fixed-size semantic vector
        semantic = self.semantic_projector(encoded.mean(dim=1))
        return semantic

    def decode(
        self,
        semantic: torch.Tensor,
        max_length: int = 100
    ) -> torch.Tensor:
        """Decode semantic back to text tokens."""
        # Expand semantic to sequence
        memory = self.semantic_expander(semantic).unsqueeze(1)
        memory = memory.expand(-1, max_length, -1)

        # Autoregressive decoding
        # (simplified - would need proper autoregressive loop in production)
        decoded = self.decoder(memory, memory)
        logits = self.output_projection(decoded)

        return logits.argmax(dim=-1)

Integrated Sensing and Communication (ISAC)

6G base stations = Communication + Radar:

import numpy as np
from scipy import signal
from typing import Tuple, List

class ISACSensor:
    """Integrated Sensing and Communication для 6G."""

    def __init__(
        self,
        carrier_frequency: float = 100e9,  # 100 GHz (mmWave/THz)
        bandwidth: float = 1e9,            # 1 GHz bandwidth
        num_antennas: int = 256            # Massive MIMO
    ):
        self.fc = carrier_frequency
        self.bandwidth = bandwidth
        self.num_antennas = num_antennas
        self.c = 3e8  # Speed of light

        # Waveform parameters
        self.subcarrier_spacing = bandwidth / 1024
        self.symbol_duration = 1 / self.subcarrier_spacing

    def generate_ofdm_waveform(
        self,
        data_symbols: np.ndarray,
        sensing_pilots: bool = True
    ) -> np.ndarray:
        """Generate OFDM waveform for both communication and sensing."""
        num_subcarriers = 1024

        # Allocate subcarriers
        if sensing_pilots:
            # Interleaved pilot pattern for sensing
            pilot_indices = np.arange(0, num_subcarriers, 4)
            data_indices = np.setdiff1d(np.arange(num_subcarriers), pilot_indices)

            # Create frequency domain signal
            freq_signal = np.zeros(num_subcarriers, dtype=complex)
            freq_signal[pilot_indices] = 1.0  # Known pilots for sensing
            freq_signal[data_indices[:len(data_symbols)]] = data_symbols
        else:
            freq_signal = data_symbols

        # IFFT to time domain
        time_signal = np.fft.ifft(freq_signal)

        return time_signal

    def estimate_range_velocity(
        self,
        received_signal: np.ndarray,
        transmitted_signal: np.ndarray
    ) -> Tuple[float, float]:
        """Estimate target range and velocity from reflected signal."""

        # Cross-correlation for range estimation
        correlation = signal.correlate(received_signal, transmitted_signal, mode='same')
        delay_samples = np.argmax(np.abs(correlation)) - len(correlation) // 2

        # Convert delay to range
        delay_seconds = delay_samples / self.bandwidth
        range_m = self.c * delay_seconds / 2  # Round trip

        # Doppler estimation for velocity
        # Using phase difference between consecutive symbols
        phase_diff = np.angle(received_signal[1:] * np.conj(received_signal[:-1]))
        doppler_shift = np.mean(phase_diff) / (2 * np.pi * self.symbol_duration)

        velocity_mps = (doppler_shift * self.c) / (2 * self.fc)

        return range_m, velocity_mps

    def beamform_and_sense(
        self,
        antenna_signals: np.ndarray,  # [num_antennas, num_samples]
        beam_directions: List[float]  # Angles in degrees
    ) -> dict:
        """Beamforming for both communication and sensing."""

        results = {}

        for angle_deg in beam_directions:
            angle_rad = np.deg2rad(angle_deg)

            # Steering vector for uniform linear array
            antenna_spacing = self.c / (2 * self.fc)
            steering_vector = np.exp(
                1j * 2 * np.pi * self.fc * antenna_spacing *
                np.sin(angle_rad) * np.arange(self.num_antennas) / self.c
            )

            # Beamformed signal
            beamformed = np.dot(steering_vector.conj(), antenna_signals)

            # Signal power in this direction
            power = np.mean(np.abs(beamformed) ** 2)

            results[angle_deg] = {
                'beamformed_signal': beamformed,
                'power': power,
                'snr_estimate': 10 * np.log10(power / 1e-10)  # Relative to noise floor
            }

        return results

    def detect_objects(
        self,
        range_doppler_map: np.ndarray,
        cfar_threshold: float = 10.0
    ) -> List[dict]:
        """CFAR detection for object identification."""
        # Constant False Alarm Rate detector

        guard_cells = 2
        training_cells = 10

        detected_objects = []

        for range_idx in range(training_cells, len(range_doppler_map) - training_cells):
            for doppler_idx in range(training_cells, len(range_doppler_map[0]) - training_cells):
                cell_under_test = range_doppler_map[range_idx, doppler_idx]

                # Calculate noise estimate from training cells
                training_region = range_doppler_map[
                    range_idx - training_cells:range_idx + training_cells + 1,
                    doppler_idx - training_cells:doppler_idx + training_cells + 1
                ]

                # Exclude guard cells
                mask = np.ones_like(training_region, dtype=bool)
                center = training_cells
                mask[center-guard_cells:center+guard_cells+1,
                     center-guard_cells:center+guard_cells+1] = False

                noise_estimate = np.mean(training_region[mask])

                # Detection
                if cell_under_test > cfar_threshold * noise_estimate:
                    detected_objects.append({
                        'range_bin': range_idx,
                        'doppler_bin': doppler_idx,
                        'power': cell_under_test,
                        'snr': cell_under_test / noise_estimate
                    })

        return detected_objects

Intelligent Resource Allocation з RL

import torch
import torch.nn as nn
import numpy as np
from typing import Dict, Tuple

class NetworkState:
    """Стан мережі для RL agent."""

    def __init__(
        self,
        num_users: int,
        num_base_stations: int,
        num_resource_blocks: int = 100
    ):
        self.num_users = num_users
        self.num_bs = num_base_stations
        self.num_rb = num_resource_blocks

    def get_observation(self) -> np.ndarray:
        """Отримання observation для RL."""
        return np.concatenate([
            self.channel_quality.flatten(),      # CSI
            self.traffic_demand.flatten(),       # QoS requirements
            self.interference_matrix.flatten(),  # Inter-cell interference
            self.buffer_status.flatten(),        # Queue lengths
            self.power_consumption.flatten()     # Energy state
        ])

    @property
    def observation_dim(self) -> int:
        return (
            self.num_users * self.num_bs +    # Channel quality
            self.num_users +                   # Traffic demand
            self.num_bs * self.num_bs +       # Interference
            self.num_users +                   # Buffers
            self.num_bs                        # Power
        )


class SpectrumAllocationAgent(nn.Module):
    """RL agent для розподілу spectrum resources."""

    def __init__(
        self,
        state_dim: int,
        num_users: int,
        num_resource_blocks: int,
        hidden_dim: int = 256
    ):
        super().__init__()
        self.num_users = num_users
        self.num_rb = num_resource_blocks

        # Actor network (policy)
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_users * num_resource_blocks)
        )

        # Critic network (value function)
        self.critic = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

        # Power allocation head
        self.power_head = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_users),
            nn.Sigmoid()  # Normalized power [0, 1]
        )

    def forward(
        self,
        state: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Returns:
            rb_allocation: [batch, num_users, num_rb] - Resource block assignment
            power: [batch, num_users] - Power allocation
            value: [batch, 1] - State value
        """
        # Resource block allocation probabilities
        rb_logits = self.actor(state)
        rb_probs = rb_logits.view(-1, self.num_users, self.num_rb)
        rb_probs = torch.softmax(rb_probs, dim=-1)

        # Power allocation
        power = self.power_head(state)

        # State value
        value = self.critic(state)

        return rb_probs, power, value

    def get_action(
        self,
        state: torch.Tensor,
        explore: bool = True
    ) -> Dict[str, np.ndarray]:
        """Sample action from policy."""
        with torch.no_grad():
            rb_probs, power, _ = self.forward(state)

            if explore:
                # Sample from distribution
                rb_allocation = torch.distributions.Categorical(rb_probs).sample()
            else:
                # Greedy selection
                rb_allocation = rb_probs.argmax(dim=-1)

        return {
            'rb_allocation': rb_allocation.numpy(),
            'power': power.numpy()
        }


class NetworkEnvironment:
    """Симуляційне середовище для тренування RL."""

    def __init__(self, state: NetworkState):
        self.state = state
        self.max_power_dbm = 23  # Mobile max power

    def step(
        self,
        action: Dict[str, np.ndarray]
    ) -> Tuple[np.ndarray, float, bool, dict]:
        """Execute action and compute reward."""

        rb_allocation = action['rb_allocation']
        power = action['power']

        # Calculate throughput (Shannon capacity)
        throughput = self._calculate_throughput(rb_allocation, power)

        # Calculate QoS satisfaction
        qos_satisfaction = self._calculate_qos(throughput)

        # Calculate power efficiency
        power_efficiency = throughput.sum() / (power.sum() + 1e-6)

        # Reward function
        reward = (
            0.4 * throughput.sum() / 1e9 +       # Throughput (normalized)
            0.4 * qos_satisfaction.mean() +       # QoS satisfaction
            0.2 * np.log1p(power_efficiency)      # Power efficiency
        )

        # Update state
        self._update_state()

        return self.state.get_observation(), reward, False, {
            'throughput': throughput,
            'qos': qos_satisfaction,
            'power_efficiency': power_efficiency
        }

    def _calculate_throughput(
        self,
        rb_allocation: np.ndarray,
        power: np.ndarray
    ) -> np.ndarray:
        """Calculate user throughput based on allocation."""
        bandwidth_per_rb = 180e3  # 180 kHz per RB
        noise_power = -174 + 10 * np.log10(bandwidth_per_rb)  # dBm

        throughputs = []
        for user in range(self.state.num_users):
            # Get allocated RBs
            user_rbs = np.where(rb_allocation == user)[0]
            if len(user_rbs) == 0:
                throughputs.append(0)
                continue

            # Calculate SINR
            signal_power = power[user] * self.state.channel_quality[user].mean()
            interference = self._calculate_interference(user, rb_allocation, power)
            sinr = signal_power / (interference + 10 ** (noise_power / 10))

            # Shannon capacity
            capacity = len(user_rbs) * bandwidth_per_rb * np.log2(1 + sinr)
            throughputs.append(capacity)

        return np.array(throughputs)


class FederatedEdgeLearning:
    """Federated Learning на edge nodes."""

    def __init__(
        self,
        num_edge_nodes: int,
        model: nn.Module,
        aggregation_strategy: str = 'fedavg'
    ):
        self.num_nodes = num_edge_nodes
        self.global_model = model
        self.local_models = [
            self._clone_model(model) for _ in range(num_edge_nodes)
        ]
        self.aggregation_strategy = aggregation_strategy

    def local_train(
        self,
        node_id: int,
        local_data: torch.utils.data.DataLoader,
        epochs: int = 1
    ) -> Dict[str, float]:
        """Тренування на локальному edge node."""
        model = self.local_models[node_id]
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
        criterion = nn.CrossEntropyLoss()

        model.train()
        total_loss = 0
        num_batches = 0

        for _ in range(epochs):
            for batch_x, batch_y in local_data:
                optimizer.zero_grad()
                output = model(batch_x)
                loss = criterion(output, batch_y)
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
                num_batches += 1

        return {'loss': total_loss / num_batches}

    def aggregate(self, participating_nodes: List[int] = None):
        """Агрегація моделей з edge nodes."""
        if participating_nodes is None:
            participating_nodes = list(range(self.num_nodes))

        if self.aggregation_strategy == 'fedavg':
            self._fedavg_aggregate(participating_nodes)
        elif self.aggregation_strategy == 'fedprox':
            self._fedprox_aggregate(participating_nodes)

    def _fedavg_aggregate(self, nodes: List[int]):
        """FedAvg агрегація."""
        global_state = self.global_model.state_dict()

        for key in global_state:
            global_state[key] = torch.stack([
                self.local_models[i].state_dict()[key].float()
                for i in nodes
            ]).mean(dim=0)

        self.global_model.load_state_dict(global_state)

        # Sync back to local models
        for node in nodes:
            self.local_models[node].load_state_dict(global_state)

    def over_the_air_aggregation(
        self,
        gradients: List[torch.Tensor],
        channel_gains: np.ndarray
    ) -> torch.Tensor:
        """Over-the-Air Federated Learning (AirComp)."""
        # Scale gradients by channel inverse
        scaled_grads = []
        for i, (grad, gain) in enumerate(zip(gradients, channel_gains)):
            # Power control for alignment
            scaling = 1.0 / np.sqrt(gain + 1e-6)
            scaled_grads.append(grad * scaling)

        # Signals superimpose over the air
        aggregated = torch.stack(scaled_grads).sum(dim=0)

        # Channel effect (all signals arrive aligned)
        aggregated = aggregated / len(gradients)

        return aggregated

Edge Inference Optimization

import torch
import torch.nn as nn
from typing import Tuple, List, Optional

class EarlyExitNetwork(nn.Module):
    """Neural network з ранніми виходами для edge inference."""

    def __init__(
        self,
        num_classes: int = 1000,
        confidence_threshold: float = 0.9
    ):
        super().__init__()
        self.confidence_threshold = confidence_threshold

        # Stage 1 (можна виконати на device)
        self.stage1 = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.exit1 = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(64, num_classes)
        )

        # Stage 2 (base station edge)
        self.stage2 = nn.Sequential(
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.exit2 = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(128, num_classes)
        )

        # Stage 3 (regional edge)
        self.stage3 = nn.Sequential(
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.exit3 = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(256, num_classes)
        )

        # Stage 4 (cloud)
        self.stage4 = nn.Sequential(
            nn.Conv2d(256, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten()
        )
        self.final_classifier = nn.Linear(512, num_classes)

    def forward(
        self,
        x: torch.Tensor,
        return_all_exits: bool = False
    ) -> Tuple[torch.Tensor, int, Optional[List[torch.Tensor]]]:
        """
        Forward з ранніми виходами.

        Returns:
            output: Final logits
            exit_point: Which exit was used (1-4)
            all_exits: All exit logits if requested
        """
        all_outputs = []

        # Stage 1
        x = self.stage1(x)
        out1 = self.exit1(x)
        all_outputs.append(out1)

        if not return_all_exits:
            conf1 = torch.softmax(out1, dim=1).max(dim=1)[0]
            if (conf1 > self.confidence_threshold).all():
                return out1, 1, None

        # Stage 2
        x = self.stage2(x)
        out2 = self.exit2(x)
        all_outputs.append(out2)

        if not return_all_exits:
            conf2 = torch.softmax(out2, dim=1).max(dim=1)[0]
            if (conf2 > self.confidence_threshold).all():
                return out2, 2, None

        # Stage 3
        x = self.stage3(x)
        out3 = self.exit3(x)
        all_outputs.append(out3)

        if not return_all_exits:
            conf3 = torch.softmax(out3, dim=1).max(dim=1)[0]
            if (conf3 > self.confidence_threshold).all():
                return out3, 3, None

        # Stage 4 (full network)
        x = self.stage4(x)
        out4 = self.final_classifier(x)
        all_outputs.append(out4)

        if return_all_exits:
            return out4, 4, all_outputs

        return out4, 4, None


class ModelSplitter:
    """Розбиття моделі між device та edge."""

    def __init__(self, model: nn.Module):
        self.model = model
        self.layers = list(model.children())

    def find_optimal_split(
        self,
        input_shape: Tuple[int, ...],
        device_compute: float,  # FLOPS available on device
        edge_compute: float,    # FLOPS available on edge
        bandwidth_bps: float    # Bandwidth for intermediate transfer
    ) -> int:
        """Знаходження оптимальної точки розбиття."""

        total_flops = self._count_flops(input_shape)
        current_shape = input_shape
        cumulative_flops = 0

        best_split = 0
        min_latency = float('inf')

        for i, layer in enumerate(self.layers):
            layer_flops = self._layer_flops(layer, current_shape)
            cumulative_flops += layer_flops

            # Compute latencies
            device_time = cumulative_flops / device_compute
            edge_time = (total_flops - cumulative_flops) / edge_compute

            # Transfer latency
            output_shape = self._get_output_shape(layer, current_shape)
            transfer_bits = np.prod(output_shape) * 32  # float32
            transfer_time = transfer_bits / bandwidth_bps

            total_latency = device_time + transfer_time + edge_time

            if total_latency < min_latency:
                min_latency = total_latency
                best_split = i + 1

            current_shape = output_shape

        return best_split

    def split_model(
        self,
        split_point: int
    ) -> Tuple[nn.Sequential, nn.Sequential]:
        """Розбиття моделі на дві частини."""
        device_part = nn.Sequential(*self.layers[:split_point])
        edge_part = nn.Sequential(*self.layers[split_point:])
        return device_part, edge_part

Hardware для Edge AI

| Platform | TOPS | Power (W) | Use Case |

|----------|------|-----------|----------|

| Apple Neural Engine | 17 | 8 | Mobile |

| NVIDIA Jetson Orin | 275 | 60 | Edge Server |

| Google Edge TPU | 4 | 2 | IoT |

| Qualcomm Hexagon | 26 | 7 | Mobile |

| Intel Movidius | 4 | 1.5 | Camera |

| Hailo-8 | 26 | 2.5 | Automotive |


Практичні Use Cases

1. Autonomous Vehicles

  • V2X communication: < 5ms latency required
  • Real-time sensor fusion at edge
  • Cooperative perception through base stations
  • Safety-critical decision making

2. Industrial IoT (Industry 4.0)

  • Predictive maintenance with < 1ms control loops
  • Real-time quality inspection
  • Robot coordination through 5G/6G
  • Digital twin synchronization

3. AR/VR (Metaverse)

  • Offload rendering to edge (< 20ms motion-to-photon)
  • Pose estimation on edge servers
  • Spatial mapping and anchoring
  • Multi-user synchronization

4. Smart Healthcare

  • Remote surgery assistance (< 10ms haptic feedback)
  • Real-time patient monitoring
  • Privacy-preserving edge analytics
  • Emergency response optimization

Ідеї для наукових досліджень

Для бакалаврської роботи:

  • Edge inference deployment на Raspberry Pi + 5G modem
  • Latency comparison: cloud vs edge processing
  • Simple federated learning demo з mobile devices

Для магістерської дисертації:

  • Model partitioning optimization algorithms
  • RL-based resource allocation для edge networks
  • Semantic communication prototype implementation

Для PhD досліджень:

  • Theoretical bounds on edge-cloud latency trade-offs
  • Novel architectures для ISAC systems
  • Foundation models для network optimization

Висновок

5G не виконав всіх promises — не через технологію, а через архітектуру. Cloud-centric підхід має фізичні обмеження: швидкість світла не обдуриш. 100ms до data center — це вічність для autonomous driving.

6G + Edge AI — це architectural shift:

  • Intelligence distributed, not centralized
  • Compute at the data, not data to compute
  • Network as intelligent computing fabric
  • Communication and sensing unified

Хто зрозуміє цю парадигму — буде будувати інфраструктуру наступного десятиліття.

Якщо ви плануєте дослідження в галузі edge computing, 6G networks чи distributed AI — команда SKP-Degree готова допомогти з формулюванням теми, реалізацією прототипу та науковим оформленням. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy — від ідеї до успішного захисту.

Ключові слова: 6G мережі, edge computing, edge AI, latency optimization, semantic communication, ISAC, federated learning, resource allocation, reinforcement learning, distributed inference, mobile networks, IoT, дипломна робота, магістерська, AI-дослідження.

Про автора

Команда SKP-Degree

Верифікований автор

Розробники та дослідники AI · Python, TensorFlow, PyTorch · Досвід у промисловій розробці

Команда SKP-Degree — професійні розробники з досвідом 7+ років у промисловій розробці. Виконали 1000+ проєктів для студентів з України, Польщі та країн Балтії.

Python Django Java ML/AI React C# / .NET JavaScript

Потрібна допомога з роботою?

Замовте курсову чи дипломну роботу з програмування. Оплата після демонстрації!

Без передоплати Відеодемонстрація Автономна робота 24/7
Написати в Telegram