5G обіцяв 1ms latency. В теорії. На практиці — 10-30ms до cloud. Для autonomous driving, де рішення потрібне за 1ms — це занадто повільно.
Рішення? Не чекати на 6G. Принести AI ближче до edge. Обчислювати там, де дані народжуються.
6G edge networks — це не просто швидший інтернет. Це розподілена інтелектуальна система. Network, який думає. Базові станції, які виконують inference. Латентність, яка нижча за людську реакцію. Це architectural shift, який визначить наступне десятиліття технологій.
5G vs 6G: еволюція чи революція?
5G (поточний, 2020+):
- Latency: 1-10ms (radio interface) + 10-50ms (до cloud)
- Peak rate: 20 Gbps теоретично, 1-2 Gbps практично
- Edge computing: опціонально, окремо від мережі
- AI: в cloud, не інтегрований
6G (прогноз 2030+):
- Latency: < 0.1ms (air interface), < 1ms end-to-end
- Peak rate: 1 Tbps (терабіт на секунду)
- AI-native architecture (AI не опція, а фундамент)
- Integrated Sensing and Communication (ISAC)
- Semantic communication
- Terahertz spectrum (300 GHz - 3 THz)
Ключова різниця: 5G — це faster pipes. 6G — це intelligent fabric.
5G Architecture:
Device ─────── Base Station ─────── Core Network ─────── Cloud
(dumb relay) (routing) (intelligence)
6G Architecture:
Device ──┬──── Intelligent BS ──┬── Distributed Core ──┬── Cloud
AI ◄──┘ AI ◄─────────┘ AI ◄──────────┘ AI
Edge Inference Regional Intelligence Global Models
Проблема: чому Cloud занадто далеко
import numpy as np
from dataclasses import dataclass
from typing import Dict, Tuple
from enum import Enum
class LatencyComponent(Enum):
RADIO_ACCESS = "Radio Access Network"
BACKHAUL = "Backhaul Network"
CORE_NETWORK = "Core Network"
INTERNET = "Internet Transit"
CLOUD_PROCESSING = "Cloud Processing"
RETURN_PATH = "Return Path"
@dataclass
class LatencyBreakdown:
"""Розбивка затримок для різних архітектур."""
components: Dict[LatencyComponent, float] # ms
@property
def total(self) -> float:
return sum(self.components.values())
def meets_requirement(self, max_latency_ms: float) -> bool:
return self.total <= max_latency_ms
class LatencyAnalyzer:
"""Аналіз затримок для різних use cases."""
# Use case latency requirements (ms)
REQUIREMENTS = {
'ar_vr': 20.0, # Motion sickness threshold
'autonomous_driving': 10.0, # Safety critical
'industrial_control': 1.0, # Real-time PLC
'gaming': 50.0, # Playable threshold
'video_call': 150.0, # Acceptable quality
'voice_call': 100.0, # Toll quality
'haptic_feedback': 5.0, # Tactile perception
}
@staticmethod
def cloud_centric_latency() -> LatencyBreakdown:
"""Типова cloud-centric архітектура."""
return LatencyBreakdown({
LatencyComponent.RADIO_ACCESS: 5.0, # 5ms RAN
LatencyComponent.BACKHAUL: 5.0, # 5ms backhaul
LatencyComponent.CORE_NETWORK: 10.0, # 10ms core
LatencyComponent.INTERNET: 20.0, # 20ms transit
LatencyComponent.CLOUD_PROCESSING: 15.0, # 15ms inference
LatencyComponent.RETURN_PATH: 40.0, # similar return
}) # Total: ~95ms
@staticmethod
def edge_optimized_latency() -> LatencyBreakdown:
"""Edge-optimized архітектура."""
return LatencyBreakdown({
LatencyComponent.RADIO_ACCESS: 2.0, # 2ms optimized RAN
LatencyComponent.BACKHAUL: 0.0, # No backhaul (local)
LatencyComponent.CORE_NETWORK: 0.0, # Bypassed
LatencyComponent.INTERNET: 0.0, # Not needed
LatencyComponent.CLOUD_PROCESSING: 3.0, # 3ms edge inference
LatencyComponent.RETURN_PATH: 2.0, # Fast return
}) # Total: ~7ms
@staticmethod
def analyze_feasibility():
"""Аналіз можливості виконання вимог."""
cloud = LatencyAnalyzer.cloud_centric_latency()
edge = LatencyAnalyzer.edge_optimized_latency()
print("Latency Analysis:")
print(f" Cloud-centric total: {cloud.total:.1f}ms")
print(f" Edge-optimized total: {edge.total:.1f}ms")
print()
print("Use Case Feasibility:")
for use_case, requirement in LatencyAnalyzer.REQUIREMENTS.items():
cloud_ok = "✓" if cloud.meets_requirement(requirement) else "✗"
edge_ok = "✓" if edge.meets_requirement(requirement) else "✗"
print(f" {use_case:25} ({requirement:5.1f}ms): "
f"Cloud {cloud_ok} | Edge {edge_ok}")
# Результат:
# autonomous_driving (10.0ms): Cloud ✗ | Edge ✓
# haptic_feedback ( 5.0ms): Cloud ✗ | Edge ✓
# ar_vr (20.0ms): Cloud ✗ | Edge ✓
Multi-Tier Edge Architecture
from abc import ABC, abstractmethod
import torch
import torch.nn as nn
from typing import List, Optional, Tuple
@dataclass
class ComputeTier:
"""Характеристики обчислювального рівня."""
name: str
latency_ms: float # Network latency to reach
compute_capacity_tops: float # Tera Operations Per Second
memory_gb: float
power_budget_watts: float
models_supported: List[str]
class EdgeComputeHierarchy:
"""Ієрархія edge computing."""
def __init__(self):
self.tiers = {
'device': ComputeTier(
name="On-Device",
latency_ms=0.0,
compute_capacity_tops=5.0, # Mobile NPU
memory_gb=4.0,
power_budget_watts=3.0,
models_supported=['tiny', 'mobile']
),
'base_station': ComputeTier(
name="Base Station Edge",
latency_ms=2.0,
compute_capacity_tops=100.0, # Edge GPU
memory_gb=32.0,
power_budget_watts=200.0,
models_supported=['tiny', 'mobile', 'medium']
),
'regional': ComputeTier(
name="Regional Edge",
latency_ms=10.0,
compute_capacity_tops=500.0, # Multiple GPUs
memory_gb=256.0,
power_budget_watts=2000.0,
models_supported=['tiny', 'mobile', 'medium', 'large']
),
'cloud': ComputeTier(
name="Cloud",
latency_ms=50.0,
compute_capacity_tops=10000.0, # GPU cluster
memory_gb=1024.0,
power_budget_watts=50000.0,
models_supported=['tiny', 'mobile', 'medium', 'large', 'foundation']
)
}
def select_optimal_tier(
self,
latency_requirement_ms: float,
model_size: str,
power_available_watts: float
) -> str:
"""Вибір оптимального рівня для inference."""
for tier_name, tier in self.tiers.items():
if (tier.latency_ms <= latency_requirement_ms and
model_size in tier.models_supported and
tier.power_budget_watts <= power_available_watts * 100): # Power constraint for mobile
return tier_name
return 'cloud' # Fallback
class DynamicTaskOffloader:
"""Динамічний offloading задач між рівнями."""
def __init__(self, hierarchy: EdgeComputeHierarchy):
self.hierarchy = hierarchy
self.task_queue = []
self.metrics_history = []
def offload_decision(
self,
task_complexity: float, # 0-1, relative complexity
deadline_ms: float,
current_network_latency: Dict[str, float],
current_device_load: float
) -> Tuple[str, float]:
"""
Рішення про offloading з урахуванням поточних умов.
Returns:
Tuple[tier_name, expected_latency]
"""
decisions = []
for tier_name, tier in self.hierarchy.tiers.items():
# Estimate processing time based on complexity and compute power
processing_time = task_complexity * 100 / tier.compute_capacity_tops
# Actual network latency (may differ from nominal)
network_latency = current_network_latency.get(tier_name, tier.latency_ms)
# Total expected latency
total_latency = network_latency + processing_time
# Can we meet deadline?
if total_latency <= deadline_ms:
decisions.append((tier_name, total_latency, tier.power_budget_watts))
if not decisions:
# No tier can meet deadline, pick fastest
return ('device', self._estimate_device_latency(task_complexity))
# Sort by latency, then by power efficiency
decisions.sort(key=lambda x: (x[1], x[2]))
return decisions[0][0], decisions[0][1]
def split_computation(
self,
model: nn.Module,
input_data: torch.Tensor,
split_point: int
) -> Tuple[torch.Tensor, List[str]]:
"""
Split model execution across tiers.
Early layers on device, later layers on edge.
"""
layers = list(model.children())
# Execute early layers on device
x = input_data
device_layers = layers[:split_point]
for layer in device_layers:
x = layer(x)
# x is now intermediate features to send to edge
intermediate_features = x
return intermediate_features, [l.__class__.__name__ for l in device_layers]
Semantic Communication: передача смислу, не бітів
Революційна ідея 6G: Замість передачі всіх пікселів зображення, передаємо тільки semantic features. Receiver реконструює зображення.
import torch
import torch.nn as nn
import torch.nn.functional as F
class SemanticEncoder(nn.Module):
"""Encoder для semantic communication."""
def __init__(
self,
input_channels: int = 3,
semantic_dim: int = 128,
compression_ratio: float = 0.01 # 100x compression!
):
super().__init__()
self.compression_ratio = compression_ratio
# CNN encoder
self.encoder = nn.Sequential(
nn.Conv2d(input_channels, 64, 3, 2, 1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, 2, 1),
nn.ReLU(),
nn.Conv2d(128, 256, 3, 2, 1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten()
)
self.semantic_projector = nn.Linear(256 * 16, semantic_dim)
# Channel coding for transmission
self.channel_encoder = nn.Sequential(
nn.Linear(semantic_dim, semantic_dim * 2),
nn.Tanh() # Bounded output for transmission
)
def forward(self, image: torch.Tensor) -> torch.Tensor:
"""Encode image to semantic representation."""
features = self.encoder(image)
semantic = self.semantic_projector(features)
channel_symbols = self.channel_encoder(semantic)
return channel_symbols
class SemanticDecoder(nn.Module):
"""Decoder для semantic communication."""
def __init__(
self,
semantic_dim: int = 128,
output_channels: int = 3,
output_size: Tuple[int, int] = (256, 256)
):
super().__init__()
self.output_size = output_size
# Channel decoding
self.channel_decoder = nn.Sequential(
nn.Linear(semantic_dim * 2, semantic_dim),
nn.ReLU()
)
# Semantic to features
self.semantic_decoder = nn.Linear(semantic_dim, 256 * 16)
# CNN decoder (generator)
self.decoder = nn.Sequential(
nn.Unflatten(1, (256, 4, 4)),
nn.ConvTranspose2d(256, 128, 4, 2, 1),
nn.ReLU(),
nn.ConvTranspose2d(128, 64, 4, 2, 1),
nn.ReLU(),
nn.ConvTranspose2d(64, 32, 4, 2, 1),
nn.ReLU(),
nn.ConvTranspose2d(32, 16, 4, 2, 1),
nn.ReLU(),
nn.ConvTranspose2d(16, output_channels, 4, 2, 1),
nn.Sigmoid()
)
def forward(self, channel_symbols: torch.Tensor) -> torch.Tensor:
"""Decode semantic representation back to image."""
semantic = self.channel_decoder(channel_symbols)
features = self.semantic_decoder(semantic)
image = self.decoder(features)
return F.interpolate(image, self.output_size)
class SemanticCommunicationSystem:
"""End-to-end semantic communication system."""
def __init__(self, semantic_dim: int = 128):
self.encoder = SemanticEncoder(semantic_dim=semantic_dim)
self.decoder = SemanticDecoder(semantic_dim=semantic_dim)
self.snr_db = 10.0 # Signal-to-noise ratio
def simulate_channel(
self,
symbols: torch.Tensor,
snr_db: float
) -> torch.Tensor:
"""Simulate wireless channel with AWGN noise."""
# Calculate noise power
signal_power = torch.mean(symbols ** 2)
snr_linear = 10 ** (snr_db / 10)
noise_power = signal_power / snr_linear
# Add Gaussian noise
noise = torch.randn_like(symbols) * torch.sqrt(noise_power)
received = symbols + noise
return received
def transmit(
self,
image: torch.Tensor,
snr_db: float = None
) -> Tuple[torch.Tensor, dict]:
"""Full transmission pipeline."""
if snr_db is None:
snr_db = self.snr_db
# Encode
symbols = self.encoder(image)
# Transmit through noisy channel
received_symbols = self.simulate_channel(symbols, snr_db)
# Decode
reconstructed = self.decoder(received_symbols)
# Calculate metrics
original_bits = image.numel() * 8 # Assuming 8 bits per pixel
transmitted_bits = symbols.numel() * 32 # Float32 symbols
metrics = {
'compression_ratio': original_bits / transmitted_bits,
'psnr': self._calculate_psnr(image, reconstructed),
'ssim': self._calculate_ssim(image, reconstructed),
'bandwidth_saved': 1 - (transmitted_bits / original_bits)
}
return reconstructed, metrics
def _calculate_psnr(self, original: torch.Tensor, reconstructed: torch.Tensor) -> float:
mse = F.mse_loss(original, reconstructed)
if mse == 0:
return float('inf')
return 10 * torch.log10(1.0 / mse).item()
class TextSemanticCommunication(nn.Module):
"""Semantic communication для тексту."""
def __init__(
self,
vocab_size: int = 50000,
embedding_dim: int = 256,
semantic_dim: int = 64
):
super().__init__()
# Text encoder (lightweight transformer)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=4,
dim_feedforward=512,
batch_first=True
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)
# Semantic compression
self.semantic_projector = nn.Linear(embedding_dim, semantic_dim)
# Decoder
self.semantic_expander = nn.Linear(semantic_dim, embedding_dim)
decoder_layer = nn.TransformerDecoderLayer(
d_model=embedding_dim,
nhead=4,
dim_feedforward=512,
batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=2)
self.output_projection = nn.Linear(embedding_dim, vocab_size)
def encode(self, text_tokens: torch.Tensor) -> torch.Tensor:
"""Encode text to semantic representation."""
embedded = self.embedding(text_tokens)
encoded = self.encoder(embedded)
# Global pooling to fixed-size semantic vector
semantic = self.semantic_projector(encoded.mean(dim=1))
return semantic
def decode(
self,
semantic: torch.Tensor,
max_length: int = 100
) -> torch.Tensor:
"""Decode semantic back to text tokens."""
# Expand semantic to sequence
memory = self.semantic_expander(semantic).unsqueeze(1)
memory = memory.expand(-1, max_length, -1)
# Autoregressive decoding
# (simplified - would need proper autoregressive loop in production)
decoded = self.decoder(memory, memory)
logits = self.output_projection(decoded)
return logits.argmax(dim=-1)
Integrated Sensing and Communication (ISAC)
6G base stations = Communication + Radar:
import numpy as np
from scipy import signal
from typing import Tuple, List
class ISACSensor:
"""Integrated Sensing and Communication для 6G."""
def __init__(
self,
carrier_frequency: float = 100e9, # 100 GHz (mmWave/THz)
bandwidth: float = 1e9, # 1 GHz bandwidth
num_antennas: int = 256 # Massive MIMO
):
self.fc = carrier_frequency
self.bandwidth = bandwidth
self.num_antennas = num_antennas
self.c = 3e8 # Speed of light
# Waveform parameters
self.subcarrier_spacing = bandwidth / 1024
self.symbol_duration = 1 / self.subcarrier_spacing
def generate_ofdm_waveform(
self,
data_symbols: np.ndarray,
sensing_pilots: bool = True
) -> np.ndarray:
"""Generate OFDM waveform for both communication and sensing."""
num_subcarriers = 1024
# Allocate subcarriers
if sensing_pilots:
# Interleaved pilot pattern for sensing
pilot_indices = np.arange(0, num_subcarriers, 4)
data_indices = np.setdiff1d(np.arange(num_subcarriers), pilot_indices)
# Create frequency domain signal
freq_signal = np.zeros(num_subcarriers, dtype=complex)
freq_signal[pilot_indices] = 1.0 # Known pilots for sensing
freq_signal[data_indices[:len(data_symbols)]] = data_symbols
else:
freq_signal = data_symbols
# IFFT to time domain
time_signal = np.fft.ifft(freq_signal)
return time_signal
def estimate_range_velocity(
self,
received_signal: np.ndarray,
transmitted_signal: np.ndarray
) -> Tuple[float, float]:
"""Estimate target range and velocity from reflected signal."""
# Cross-correlation for range estimation
correlation = signal.correlate(received_signal, transmitted_signal, mode='same')
delay_samples = np.argmax(np.abs(correlation)) - len(correlation) // 2
# Convert delay to range
delay_seconds = delay_samples / self.bandwidth
range_m = self.c * delay_seconds / 2 # Round trip
# Doppler estimation for velocity
# Using phase difference between consecutive symbols
phase_diff = np.angle(received_signal[1:] * np.conj(received_signal[:-1]))
doppler_shift = np.mean(phase_diff) / (2 * np.pi * self.symbol_duration)
velocity_mps = (doppler_shift * self.c) / (2 * self.fc)
return range_m, velocity_mps
def beamform_and_sense(
self,
antenna_signals: np.ndarray, # [num_antennas, num_samples]
beam_directions: List[float] # Angles in degrees
) -> dict:
"""Beamforming for both communication and sensing."""
results = {}
for angle_deg in beam_directions:
angle_rad = np.deg2rad(angle_deg)
# Steering vector for uniform linear array
antenna_spacing = self.c / (2 * self.fc)
steering_vector = np.exp(
1j * 2 * np.pi * self.fc * antenna_spacing *
np.sin(angle_rad) * np.arange(self.num_antennas) / self.c
)
# Beamformed signal
beamformed = np.dot(steering_vector.conj(), antenna_signals)
# Signal power in this direction
power = np.mean(np.abs(beamformed) ** 2)
results[angle_deg] = {
'beamformed_signal': beamformed,
'power': power,
'snr_estimate': 10 * np.log10(power / 1e-10) # Relative to noise floor
}
return results
def detect_objects(
self,
range_doppler_map: np.ndarray,
cfar_threshold: float = 10.0
) -> List[dict]:
"""CFAR detection for object identification."""
# Constant False Alarm Rate detector
guard_cells = 2
training_cells = 10
detected_objects = []
for range_idx in range(training_cells, len(range_doppler_map) - training_cells):
for doppler_idx in range(training_cells, len(range_doppler_map[0]) - training_cells):
cell_under_test = range_doppler_map[range_idx, doppler_idx]
# Calculate noise estimate from training cells
training_region = range_doppler_map[
range_idx - training_cells:range_idx + training_cells + 1,
doppler_idx - training_cells:doppler_idx + training_cells + 1
]
# Exclude guard cells
mask = np.ones_like(training_region, dtype=bool)
center = training_cells
mask[center-guard_cells:center+guard_cells+1,
center-guard_cells:center+guard_cells+1] = False
noise_estimate = np.mean(training_region[mask])
# Detection
if cell_under_test > cfar_threshold * noise_estimate:
detected_objects.append({
'range_bin': range_idx,
'doppler_bin': doppler_idx,
'power': cell_under_test,
'snr': cell_under_test / noise_estimate
})
return detected_objects
Intelligent Resource Allocation з RL
import torch
import torch.nn as nn
import numpy as np
from typing import Dict, Tuple
class NetworkState:
"""Стан мережі для RL agent."""
def __init__(
self,
num_users: int,
num_base_stations: int,
num_resource_blocks: int = 100
):
self.num_users = num_users
self.num_bs = num_base_stations
self.num_rb = num_resource_blocks
def get_observation(self) -> np.ndarray:
"""Отримання observation для RL."""
return np.concatenate([
self.channel_quality.flatten(), # CSI
self.traffic_demand.flatten(), # QoS requirements
self.interference_matrix.flatten(), # Inter-cell interference
self.buffer_status.flatten(), # Queue lengths
self.power_consumption.flatten() # Energy state
])
@property
def observation_dim(self) -> int:
return (
self.num_users * self.num_bs + # Channel quality
self.num_users + # Traffic demand
self.num_bs * self.num_bs + # Interference
self.num_users + # Buffers
self.num_bs # Power
)
class SpectrumAllocationAgent(nn.Module):
"""RL agent для розподілу spectrum resources."""
def __init__(
self,
state_dim: int,
num_users: int,
num_resource_blocks: int,
hidden_dim: int = 256
):
super().__init__()
self.num_users = num_users
self.num_rb = num_resource_blocks
# Actor network (policy)
self.actor = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, num_users * num_resource_blocks)
)
# Critic network (value function)
self.critic = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
# Power allocation head
self.power_head = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, num_users),
nn.Sigmoid() # Normalized power [0, 1]
)
def forward(
self,
state: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Returns:
rb_allocation: [batch, num_users, num_rb] - Resource block assignment
power: [batch, num_users] - Power allocation
value: [batch, 1] - State value
"""
# Resource block allocation probabilities
rb_logits = self.actor(state)
rb_probs = rb_logits.view(-1, self.num_users, self.num_rb)
rb_probs = torch.softmax(rb_probs, dim=-1)
# Power allocation
power = self.power_head(state)
# State value
value = self.critic(state)
return rb_probs, power, value
def get_action(
self,
state: torch.Tensor,
explore: bool = True
) -> Dict[str, np.ndarray]:
"""Sample action from policy."""
with torch.no_grad():
rb_probs, power, _ = self.forward(state)
if explore:
# Sample from distribution
rb_allocation = torch.distributions.Categorical(rb_probs).sample()
else:
# Greedy selection
rb_allocation = rb_probs.argmax(dim=-1)
return {
'rb_allocation': rb_allocation.numpy(),
'power': power.numpy()
}
class NetworkEnvironment:
"""Симуляційне середовище для тренування RL."""
def __init__(self, state: NetworkState):
self.state = state
self.max_power_dbm = 23 # Mobile max power
def step(
self,
action: Dict[str, np.ndarray]
) -> Tuple[np.ndarray, float, bool, dict]:
"""Execute action and compute reward."""
rb_allocation = action['rb_allocation']
power = action['power']
# Calculate throughput (Shannon capacity)
throughput = self._calculate_throughput(rb_allocation, power)
# Calculate QoS satisfaction
qos_satisfaction = self._calculate_qos(throughput)
# Calculate power efficiency
power_efficiency = throughput.sum() / (power.sum() + 1e-6)
# Reward function
reward = (
0.4 * throughput.sum() / 1e9 + # Throughput (normalized)
0.4 * qos_satisfaction.mean() + # QoS satisfaction
0.2 * np.log1p(power_efficiency) # Power efficiency
)
# Update state
self._update_state()
return self.state.get_observation(), reward, False, {
'throughput': throughput,
'qos': qos_satisfaction,
'power_efficiency': power_efficiency
}
def _calculate_throughput(
self,
rb_allocation: np.ndarray,
power: np.ndarray
) -> np.ndarray:
"""Calculate user throughput based on allocation."""
bandwidth_per_rb = 180e3 # 180 kHz per RB
noise_power = -174 + 10 * np.log10(bandwidth_per_rb) # dBm
throughputs = []
for user in range(self.state.num_users):
# Get allocated RBs
user_rbs = np.where(rb_allocation == user)[0]
if len(user_rbs) == 0:
throughputs.append(0)
continue
# Calculate SINR
signal_power = power[user] * self.state.channel_quality[user].mean()
interference = self._calculate_interference(user, rb_allocation, power)
sinr = signal_power / (interference + 10 ** (noise_power / 10))
# Shannon capacity
capacity = len(user_rbs) * bandwidth_per_rb * np.log2(1 + sinr)
throughputs.append(capacity)
return np.array(throughputs)
class FederatedEdgeLearning:
"""Federated Learning на edge nodes."""
def __init__(
self,
num_edge_nodes: int,
model: nn.Module,
aggregation_strategy: str = 'fedavg'
):
self.num_nodes = num_edge_nodes
self.global_model = model
self.local_models = [
self._clone_model(model) for _ in range(num_edge_nodes)
]
self.aggregation_strategy = aggregation_strategy
def local_train(
self,
node_id: int,
local_data: torch.utils.data.DataLoader,
epochs: int = 1
) -> Dict[str, float]:
"""Тренування на локальному edge node."""
model = self.local_models[node_id]
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
model.train()
total_loss = 0
num_batches = 0
for _ in range(epochs):
for batch_x, batch_y in local_data:
optimizer.zero_grad()
output = model(batch_x)
loss = criterion(output, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
num_batches += 1
return {'loss': total_loss / num_batches}
def aggregate(self, participating_nodes: List[int] = None):
"""Агрегація моделей з edge nodes."""
if participating_nodes is None:
participating_nodes = list(range(self.num_nodes))
if self.aggregation_strategy == 'fedavg':
self._fedavg_aggregate(participating_nodes)
elif self.aggregation_strategy == 'fedprox':
self._fedprox_aggregate(participating_nodes)
def _fedavg_aggregate(self, nodes: List[int]):
"""FedAvg агрегація."""
global_state = self.global_model.state_dict()
for key in global_state:
global_state[key] = torch.stack([
self.local_models[i].state_dict()[key].float()
for i in nodes
]).mean(dim=0)
self.global_model.load_state_dict(global_state)
# Sync back to local models
for node in nodes:
self.local_models[node].load_state_dict(global_state)
def over_the_air_aggregation(
self,
gradients: List[torch.Tensor],
channel_gains: np.ndarray
) -> torch.Tensor:
"""Over-the-Air Federated Learning (AirComp)."""
# Scale gradients by channel inverse
scaled_grads = []
for i, (grad, gain) in enumerate(zip(gradients, channel_gains)):
# Power control for alignment
scaling = 1.0 / np.sqrt(gain + 1e-6)
scaled_grads.append(grad * scaling)
# Signals superimpose over the air
aggregated = torch.stack(scaled_grads).sum(dim=0)
# Channel effect (all signals arrive aligned)
aggregated = aggregated / len(gradients)
return aggregated
Edge Inference Optimization
import torch
import torch.nn as nn
from typing import Tuple, List, Optional
class EarlyExitNetwork(nn.Module):
"""Neural network з ранніми виходами для edge inference."""
def __init__(
self,
num_classes: int = 1000,
confidence_threshold: float = 0.9
):
super().__init__()
self.confidence_threshold = confidence_threshold
# Stage 1 (можна виконати на device)
self.stage1 = nn.Sequential(
nn.Conv2d(3, 64, 3, 1, 1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.exit1 = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(64, num_classes)
)
# Stage 2 (base station edge)
self.stage2 = nn.Sequential(
nn.Conv2d(64, 128, 3, 1, 1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.exit2 = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(128, num_classes)
)
# Stage 3 (regional edge)
self.stage3 = nn.Sequential(
nn.Conv2d(128, 256, 3, 1, 1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.exit3 = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(256, num_classes)
)
# Stage 4 (cloud)
self.stage4 = nn.Sequential(
nn.Conv2d(256, 512, 3, 1, 1),
nn.BatchNorm2d(512),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1),
nn.Flatten()
)
self.final_classifier = nn.Linear(512, num_classes)
def forward(
self,
x: torch.Tensor,
return_all_exits: bool = False
) -> Tuple[torch.Tensor, int, Optional[List[torch.Tensor]]]:
"""
Forward з ранніми виходами.
Returns:
output: Final logits
exit_point: Which exit was used (1-4)
all_exits: All exit logits if requested
"""
all_outputs = []
# Stage 1
x = self.stage1(x)
out1 = self.exit1(x)
all_outputs.append(out1)
if not return_all_exits:
conf1 = torch.softmax(out1, dim=1).max(dim=1)[0]
if (conf1 > self.confidence_threshold).all():
return out1, 1, None
# Stage 2
x = self.stage2(x)
out2 = self.exit2(x)
all_outputs.append(out2)
if not return_all_exits:
conf2 = torch.softmax(out2, dim=1).max(dim=1)[0]
if (conf2 > self.confidence_threshold).all():
return out2, 2, None
# Stage 3
x = self.stage3(x)
out3 = self.exit3(x)
all_outputs.append(out3)
if not return_all_exits:
conf3 = torch.softmax(out3, dim=1).max(dim=1)[0]
if (conf3 > self.confidence_threshold).all():
return out3, 3, None
# Stage 4 (full network)
x = self.stage4(x)
out4 = self.final_classifier(x)
all_outputs.append(out4)
if return_all_exits:
return out4, 4, all_outputs
return out4, 4, None
class ModelSplitter:
"""Розбиття моделі між device та edge."""
def __init__(self, model: nn.Module):
self.model = model
self.layers = list(model.children())
def find_optimal_split(
self,
input_shape: Tuple[int, ...],
device_compute: float, # FLOPS available on device
edge_compute: float, # FLOPS available on edge
bandwidth_bps: float # Bandwidth for intermediate transfer
) -> int:
"""Знаходження оптимальної точки розбиття."""
total_flops = self._count_flops(input_shape)
current_shape = input_shape
cumulative_flops = 0
best_split = 0
min_latency = float('inf')
for i, layer in enumerate(self.layers):
layer_flops = self._layer_flops(layer, current_shape)
cumulative_flops += layer_flops
# Compute latencies
device_time = cumulative_flops / device_compute
edge_time = (total_flops - cumulative_flops) / edge_compute
# Transfer latency
output_shape = self._get_output_shape(layer, current_shape)
transfer_bits = np.prod(output_shape) * 32 # float32
transfer_time = transfer_bits / bandwidth_bps
total_latency = device_time + transfer_time + edge_time
if total_latency < min_latency:
min_latency = total_latency
best_split = i + 1
current_shape = output_shape
return best_split
def split_model(
self,
split_point: int
) -> Tuple[nn.Sequential, nn.Sequential]:
"""Розбиття моделі на дві частини."""
device_part = nn.Sequential(*self.layers[:split_point])
edge_part = nn.Sequential(*self.layers[split_point:])
return device_part, edge_part
Hardware для Edge AI
| Platform | TOPS | Power (W) | Use Case |
|----------|------|-----------|----------|
| Apple Neural Engine | 17 | 8 | Mobile |
| NVIDIA Jetson Orin | 275 | 60 | Edge Server |
| Google Edge TPU | 4 | 2 | IoT |
| Qualcomm Hexagon | 26 | 7 | Mobile |
| Intel Movidius | 4 | 1.5 | Camera |
| Hailo-8 | 26 | 2.5 | Automotive |
Практичні Use Cases
1. Autonomous Vehicles
- V2X communication: < 5ms latency required
- Real-time sensor fusion at edge
- Cooperative perception through base stations
- Safety-critical decision making
2. Industrial IoT (Industry 4.0)
- Predictive maintenance with < 1ms control loops
- Real-time quality inspection
- Robot coordination through 5G/6G
- Digital twin synchronization
3. AR/VR (Metaverse)
- Offload rendering to edge (< 20ms motion-to-photon)
- Pose estimation on edge servers
- Spatial mapping and anchoring
- Multi-user synchronization
4. Smart Healthcare
- Remote surgery assistance (< 10ms haptic feedback)
- Real-time patient monitoring
- Privacy-preserving edge analytics
- Emergency response optimization
Ідеї для наукових досліджень
Для бакалаврської роботи:
- Edge inference deployment на Raspberry Pi + 5G modem
- Latency comparison: cloud vs edge processing
- Simple federated learning demo з mobile devices
Для магістерської дисертації:
- Model partitioning optimization algorithms
- RL-based resource allocation для edge networks
- Semantic communication prototype implementation
Для PhD досліджень:
- Theoretical bounds on edge-cloud latency trade-offs
- Novel architectures для ISAC systems
- Foundation models для network optimization
Висновок
5G не виконав всіх promises — не через технологію, а через архітектуру. Cloud-centric підхід має фізичні обмеження: швидкість світла не обдуриш. 100ms до data center — це вічність для autonomous driving.
6G + Edge AI — це architectural shift:
- Intelligence distributed, not centralized
- Compute at the data, not data to compute
- Network as intelligent computing fabric
- Communication and sensing unified
Хто зрозуміє цю парадигму — буде будувати інфраструктуру наступного десятиліття.
Якщо ви плануєте дослідження в галузі edge computing, 6G networks чи distributed AI — команда SKP-Degree готова допомогти з формулюванням теми, реалізацією прототипу та науковим оформленням. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy — від ідеї до успішного захисту.
Ключові слова: 6G мережі, edge computing, edge AI, latency optimization, semantic communication, ISAC, federated learning, resource allocation, reinforcement learning, distributed inference, mobile networks, IoT, дипломна робота, магістерська, AI-дослідження.