GPT-4 тренування: ~50,000 MWh. Це річне споживання 5,000 домогосподарств. Carbon footprint: як 300 перельотів Нью-Йорк — Сан-Франциско.
І це тільки training. Inference GPT-4 споживає більше електроенергії, ніж training — щодня. Microsoft, Google, Amazon будують нові ядерні реактори для AI data centers. Це не жарт.
Sustainable AI — не про «садити дерева на кожен запит». Це про архітектурні рішення: менші моделі, ефективніші алгоритми, розумніший hardware. Edge AI — частина рішення, бо менше data transfer = менше енергії.
Масштаб проблеми: цифри, які лякають
Енергоспоживання AI:
- 2020: 0.1% світового електроспоживання
- 2024: ~1% (оцінка)
- 2030: 3-5% (прогноз IEA)
- Data centers globally: ~400 TWh/рік (більше ніж Велика Британія)
Порівняння операцій:
Google search: 0.0003 kWh = 0.2g CO2
GPT-4 query: 0.001-0.01 kWh = 5-50g CO2
Image generation: 0.01-0.1 kWh = 50-500g CO2
Training GPT-4: ~50,000,000 kWh = 25,000 tonnes CO2
Контекст:
- Один query ChatGPT ≈ зарядка смартфона
- Мільйон queries на день = мале місто
- Training foundation model = авіаперельоти всіх працівників Google за рік
Carbon footprint training:
| Model | CO2 (tonnes) | Equivalent |
|-------|-------------|------------|
| BERT | 1.5 | 1 трансатлантичний переліт |
| GPT-3 | 552 | 300 перельотів NYC-SF |
| GPT-4 | ~25,000 | Lifetime emissions 2000 людей |
| LLaMA-2 70B | 290 | 60 автомобілів за рік |
Де витрачається енергія: breakdown
from dataclasses import dataclass
from typing import Dict, List
import numpy as np
@dataclass
class EnergyBreakdown:
"""Розподіл енергії в ML системі."""
gpu_compute: float # Основні обчислення
memory_access: float # DRAM читання/запис
network_io: float # Data transfer
cooling: float # Охолодження
storage: float # SSD/HDD
overhead: float # Power conversion, etc.
@property
def total(self) -> float:
return (self.gpu_compute + self.memory_access +
self.network_io + self.cooling +
self.storage + self.overhead)
def to_dict(self) -> Dict[str, float]:
return {
'GPU Compute': self.gpu_compute,
'Memory Access': self.memory_access,
'Network I/O': self.network_io,
'Cooling': self.cooling,
'Storage': self.storage,
'Overhead': self.overhead
}
class EnergyProfiler:
"""Профілювання енергоспоживання ML operations."""
# Typical energy costs (Joules)
GPU_FLOP_ENERGY = 1e-12 # ~1 pJ per FLOP (modern GPU)
DRAM_ACCESS_ENERGY = 1e-9 # ~1 nJ per byte
NETWORK_BIT_ENERGY = 1e-9 # ~1 nJ per bit
SSD_READ_ENERGY = 1e-6 # ~1 µJ per byte
@classmethod
def estimate_training_energy(
cls,
model_params: int, # Number of parameters
dataset_size: int, # Number of samples
batch_size: int,
epochs: int,
flops_per_sample: int, # FLOPs per forward+backward pass
gpu_efficiency: float = 0.3 # Actual vs theoretical FLOPS
) -> EnergyBreakdown:
"""Estimate energy for training."""
total_batches = (dataset_size * epochs) // batch_size
# GPU compute
total_flops = flops_per_sample * dataset_size * epochs
gpu_energy = total_flops * cls.GPU_FLOP_ENERGY / gpu_efficiency
# Memory access (weights + activations + gradients)
bytes_per_batch = (
model_params * 4 * 3 + # weights, gradients, optimizer state
flops_per_sample // 100 # rough activation estimate
) * batch_size
memory_energy = bytes_per_batch * total_batches * cls.DRAM_ACCESS_ENERGY
# Network (distributed training)
gradient_sync_bytes = model_params * 4 * total_batches * 0.1 # Assume 10% sync
network_energy = gradient_sync_bytes * 8 * cls.NETWORK_BIT_ENERGY
# Storage (checkpoints, logs)
storage_bytes = model_params * 4 * epochs # Save each epoch
storage_energy = storage_bytes * cls.SSD_READ_ENERGY
# Cooling (~30% of compute)
compute_total = gpu_energy + memory_energy
cooling_energy = compute_total * 0.3
# Overhead (~10%)
overhead = (gpu_energy + memory_energy + network_energy) * 0.1
return EnergyBreakdown(
gpu_compute=gpu_energy,
memory_access=memory_energy,
network_io=network_energy,
cooling=cooling_energy,
storage=storage_energy,
overhead=overhead
)
@classmethod
def estimate_inference_energy(
cls,
model_params: int,
flops_per_inference: int,
batch_size: int = 1,
include_network: bool = True,
input_bytes: int = 1000,
output_bytes: int = 100
) -> float:
"""Estimate energy for single inference."""
# GPU compute
gpu_energy = flops_per_inference * cls.GPU_FLOP_ENERGY / 0.3
# Memory (load weights + activations)
memory_bytes = model_params * 2 + flops_per_inference // 100 # FP16 weights
memory_energy = memory_bytes * cls.DRAM_ACCESS_ENERGY
# Network transfer (if cloud)
if include_network:
network_energy = (input_bytes + output_bytes) * 8 * cls.NETWORK_BIT_ENERGY * 1000
else:
network_energy = 0
total = gpu_energy + memory_energy + network_energy
return total
# Приклад: порівняння cloud vs edge
def compare_cloud_vs_edge():
"""Порівняння енергоефективності cloud та edge inference."""
model_params = 7e9 # 7B parameter model
flops = 14e9 # ~2 FLOPs per param for transformer
# Cloud inference (with network)
cloud_energy = EnergyProfiler.estimate_inference_energy(
model_params=model_params,
flops_per_inference=flops,
include_network=True,
input_bytes=2000, # Prompt
output_bytes=1000 # Response
)
# Edge inference (quantized, no network)
edge_energy = EnergyProfiler.estimate_inference_energy(
model_params=model_params // 4, # INT4 quantization
flops_per_inference=flops // 4, # Reduced precision
include_network=False
)
print(f"Cloud inference energy: {cloud_energy:.2e} J")
print(f"Edge inference energy: {edge_energy:.2e} J")
print(f"Edge is {cloud_energy/edge_energy:.1f}x more efficient")
return cloud_energy, edge_energy
Стратегії Green AI: архітектурний рівень
1. Efficient Architectures
import torch
import torch.nn as nn
from typing import Tuple, Optional
class MixtureOfExperts(nn.Module):
"""
Mixture of Experts — активуємо тільки частину параметрів.
8 experts × 1B params each = 8B total params
But only 2 experts active = 2B params compute
4x parameter efficiency!
"""
def __init__(
self,
input_dim: int,
expert_dim: int,
num_experts: int = 8,
top_k: int = 2
):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k
# Router (gating network)
self.router = nn.Linear(input_dim, num_experts)
# Expert networks
self.experts = nn.ModuleList([
nn.Sequential(
nn.Linear(input_dim, expert_dim),
nn.GELU(),
nn.Linear(expert_dim, input_dim)
)
for _ in range(num_experts)
])
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
batch_size, seq_len, dim = x.shape
# Compute routing probabilities
router_logits = self.router(x) # [batch, seq, num_experts]
router_probs = torch.softmax(router_logits, dim=-1)
# Select top-k experts
top_k_probs, top_k_indices = torch.topk(router_probs, self.top_k, dim=-1)
top_k_probs = top_k_probs / top_k_probs.sum(dim=-1, keepdim=True) # Renormalize
# Compute expert outputs (only for selected experts)
output = torch.zeros_like(x)
for i in range(self.top_k):
expert_idx = top_k_indices[:, :, i] # [batch, seq]
expert_weight = top_k_probs[:, :, i:i+1] # [batch, seq, 1]
for e in range(self.num_experts):
mask = (expert_idx == e)
if mask.any():
expert_input = x[mask]
expert_output = self.experts[e](expert_input)
output[mask] += expert_weight[mask] * expert_output
# Load balancing loss (encourage even expert usage)
expert_usage = router_probs.mean(dim=[0, 1])
load_balance_loss = self.num_experts * (expert_usage ** 2).sum()
return output, load_balance_loss
class EarlyExitTransformer(nn.Module):
"""
Transformer з ранніми виходами.
Easy inputs: вихід після 2 layers
Medium inputs: вихід після 6 layers
Hard inputs: all 12 layers
Average 50% compute reduction!
"""
def __init__(
self,
d_model: int = 768,
num_layers: int = 12,
exit_points: List[int] = [2, 6, 12],
confidence_threshold: float = 0.9
):
super().__init__()
self.num_layers = num_layers
self.exit_points = exit_points
self.confidence_threshold = confidence_threshold
# Transformer layers
self.layers = nn.ModuleList([
nn.TransformerEncoderLayer(d_model, nhead=12)
for _ in range(num_layers)
])
# Exit classifiers at each exit point
self.exit_classifiers = nn.ModuleDict({
str(ep): nn.Linear(d_model, 1000) # ImageNet classes
for ep in exit_points
})
def forward(
self,
x: torch.Tensor,
allow_early_exit: bool = True
) -> Tuple[torch.Tensor, int]:
"""
Returns:
output: Classification logits
exit_layer: Which layer was used for exit
"""
for i, layer in enumerate(self.layers):
x = layer(x)
layer_num = i + 1
if layer_num in self.exit_points:
classifier = self.exit_classifiers[str(layer_num)]
logits = classifier(x.mean(dim=1)) # Global average pooling
if allow_early_exit and layer_num < self.num_layers:
confidence = torch.softmax(logits, dim=-1).max(dim=-1)[0]
if (confidence > self.confidence_threshold).all():
return logits, layer_num
# Final exit
final_logits = self.exit_classifiers[str(self.num_layers)](x.mean(dim=1))
return final_logits, self.num_layers
def compute_energy_savings(
self,
exit_distribution: Dict[int, float]
) -> float:
"""Compute energy savings from early exits."""
total_energy = 0
full_energy = self.num_layers
for exit_layer, fraction in exit_distribution.items():
total_energy += exit_layer * fraction
savings = 1 - (total_energy / full_energy)
return savings
2. Quantization
import torch
import torch.nn as nn
from torch.ao.quantization import quantize_dynamic, get_default_qconfig
from typing import Tuple
class QuantizationOptimizer:
"""Оптимізація через quantization."""
@staticmethod
def dynamic_quantization(model: nn.Module) -> nn.Module:
"""
Dynamic INT8 quantization.
Weights: INT8, Activations: FP32 (quantized at runtime)
~2x memory reduction, ~1.5x speedup
"""
quantized = quantize_dynamic(
model,
{nn.Linear, nn.LSTM, nn.GRU},
dtype=torch.qint8
)
return quantized
@staticmethod
def static_quantization(
model: nn.Module,
calibration_data: torch.Tensor
) -> nn.Module:
"""
Static INT8 quantization.
Both weights and activations: INT8
~4x memory reduction, ~2-3x speedup
"""
model.eval()
# Prepare for quantization
model.qconfig = get_default_qconfig('fbgemm')
torch.ao.quantization.prepare(model, inplace=True)
# Calibration
with torch.no_grad():
model(calibration_data)
# Convert
torch.ao.quantization.convert(model, inplace=True)
return model
@staticmethod
def quantize_to_int4(
weight: torch.Tensor,
group_size: int = 128
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
INT4 quantization with groupwise scaling.
~8x memory reduction vs FP32
"""
# Reshape for groupwise quantization
original_shape = weight.shape
weight_flat = weight.view(-1, group_size)
# Compute scales per group
max_vals = weight_flat.abs().max(dim=1, keepdim=True)[0]
scales = max_vals / 7 # INT4 range: -8 to 7
# Quantize
quantized = torch.round(weight_flat / (scales + 1e-8))
quantized = quantized.clamp(-8, 7).to(torch.int8)
return quantized.view(original_shape), scales.view(-1)
@staticmethod
def estimate_memory_reduction(
original_dtype: str,
quantized_dtype: str
) -> float:
"""Estimate memory reduction from quantization."""
bits = {
'fp32': 32,
'fp16': 16,
'bf16': 16,
'int8': 8,
'int4': 4,
'int2': 2
}
return bits[original_dtype] / bits[quantized_dtype]
class QLoRA(nn.Module):
"""
Quantized Low-Rank Adaptation.
Base model: INT4 (frozen)
Adapters: FP16 (trainable)
Train with 0.1% of parameters, 4x memory reduction!
"""
def __init__(
self,
base_model: nn.Module,
rank: int = 8,
alpha: float = 16
):
super().__init__()
self.rank = rank
self.alpha = alpha
self.scaling = alpha / rank
# Quantize base model to INT4
self.base_model = self._quantize_base(base_model)
# Add LoRA adapters to linear layers
self.adapters = nn.ModuleDict()
for name, module in base_model.named_modules():
if isinstance(module, nn.Linear):
in_features = module.in_features
out_features = module.out_features
self.adapters[name.replace('.', '_')] = nn.ModuleDict({
'lora_A': nn.Linear(in_features, rank, bias=False),
'lora_B': nn.Linear(rank, out_features, bias=False)
})
# Initialize B to zero
nn.init.zeros_(self.adapters[name.replace('.', '_')]['lora_B'].weight)
def _quantize_base(self, model: nn.Module) -> nn.Module:
"""Quantize base model weights to INT4."""
for param in model.parameters():
param.requires_grad = False
return model
def forward(self, x: torch.Tensor) -> torch.Tensor:
# Base model forward (INT4)
base_output = self.base_model(x)
# Add LoRA contribution
# (simplified - in practice would hook into each layer)
return base_output
@property
def trainable_params(self) -> int:
return sum(p.numel() for p in self.parameters() if p.requires_grad)
@property
def total_params(self) -> int:
return sum(p.numel() for p in self.parameters())
@property
def trainable_ratio(self) -> float:
return self.trainable_params / self.total_params
3. Knowledge Distillation
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationTrainer:
"""
Knowledge Distillation: transfer knowledge from large to small model.
Teacher (GPT-4 175B) → Student (GPT-2 1.5B)
Student ≈ 80% Teacher quality
Student = 1% Teacher compute
"""
def __init__(
self,
teacher: nn.Module,
student: nn.Module,
temperature: float = 4.0,
alpha: float = 0.5 # Balance between soft and hard labels
):
self.teacher = teacher
self.student = student
self.temperature = temperature
self.alpha = alpha
# Freeze teacher
for param in self.teacher.parameters():
param.requires_grad = False
self.teacher.eval()
def distillation_loss(
self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor
) -> torch.Tensor:
"""
Combined loss:
- Soft targets: KL divergence with temperature
- Hard targets: Cross entropy with true labels
"""
# Soft targets (from teacher)
soft_teacher = F.softmax(teacher_logits / self.temperature, dim=-1)
soft_student = F.log_softmax(student_logits / self.temperature, dim=-1)
soft_loss = F.kl_div(
soft_student,
soft_teacher,
reduction='batchmean'
) * (self.temperature ** 2)
# Hard targets (true labels)
hard_loss = F.cross_entropy(student_logits, labels)
# Combined loss
total_loss = self.alpha * soft_loss + (1 - self.alpha) * hard_loss
return total_loss
def train_step(
self,
inputs: torch.Tensor,
labels: torch.Tensor,
optimizer: torch.optim.Optimizer
) -> float:
"""Single training step."""
optimizer.zero_grad()
# Get teacher predictions
with torch.no_grad():
teacher_logits = self.teacher(inputs)
# Get student predictions
student_logits = self.student(inputs)
# Compute loss
loss = self.distillation_loss(student_logits, teacher_logits, labels)
loss.backward()
optimizer.step()
return loss.item()
class ProgressiveDistillation:
"""
Progressive distillation for diffusion models.
Original: 1000 steps
After distillation: 4 steps
250x speedup!
"""
def __init__(
self,
teacher_model: nn.Module,
num_stages: int = 4 # 1000 → 500 → 250 → 125 → 4
):
self.teacher = teacher_model
self.num_stages = num_stages
self.students = []
def distill_stage(
self,
current_steps: int,
target_steps: int,
training_data: torch.utils.data.DataLoader,
epochs: int = 100
) -> nn.Module:
"""Distill model to use fewer diffusion steps."""
student = type(self.teacher)() # Clone architecture
student.load_state_dict(self.teacher.state_dict())
optimizer = torch.optim.AdamW(student.parameters(), lr=1e-5)
for epoch in range(epochs):
for batch in training_data:
# Teacher: 2 steps
# Student: 1 step that matches 2 teacher steps
# Get teacher trajectory (2 steps)
with torch.no_grad():
t1 = current_steps
t2 = current_steps // 2
teacher_out = self.teacher.step(batch, t1, t2)
# Student learns single step
student_out = student.step(batch, t1, t2, single_step=True)
# Match outputs
loss = F.mse_loss(student_out, teacher_out)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return student
def full_distillation(
self,
training_data: torch.utils.data.DataLoader
) -> nn.Module:
"""Run full progressive distillation."""
current_model = self.teacher
current_steps = 1000
for stage in range(self.num_stages):
target_steps = current_steps // 2
print(f"Stage {stage + 1}: {current_steps} → {target_steps} steps")
student = self.distill_stage(
current_steps, target_steps, training_data
)
self.students.append(student)
current_model = student
current_steps = target_steps
return current_model
Вимірювання енергії: практичні інструменти
from codecarbon import EmissionsTracker, OfflineEmissionsTracker
import torch
from torch.profiler import profile, ProfilerActivity
import time
from typing import Dict, Any
class GreenAIProfiler:
"""Профілювання energy та carbon footprint."""
def __init__(
self,
project_name: str = "ml_experiment",
country_iso_code: str = "UKR" # Україна
):
self.project_name = project_name
self.country_code = country_iso_code
# Carbon tracker
self.tracker = OfflineEmissionsTracker(
project_name=project_name,
country_iso_code=country_iso_code,
log_level="warning"
)
self.metrics = {}
def start_tracking(self):
"""Start energy tracking."""
self.tracker.start()
self.start_time = time.time()
def stop_tracking(self) -> Dict[str, float]:
"""Stop tracking and return metrics."""
emissions = self.tracker.stop()
duration = time.time() - self.start_time
self.metrics = {
'emissions_kg_co2': emissions,
'emissions_g_co2': emissions * 1000,
'duration_seconds': duration,
'energy_kwh': self.tracker.final_emissions_data.energy_consumed,
'power_watts': (self.tracker.final_emissions_data.energy_consumed * 1000 * 3600) / duration if duration > 0 else 0
}
return self.metrics
def profile_inference(
self,
model: torch.nn.Module,
input_data: torch.Tensor,
num_runs: int = 100
) -> Dict[str, Any]:
"""Profile inference energy and latency."""
model.eval()
device = next(model.parameters()).device
# Warmup
with torch.no_grad():
for _ in range(10):
_ = model(input_data)
# Profile with PyTorch profiler
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
with_flops=True,
record_shapes=True
) as prof:
self.start_tracking()
with torch.no_grad():
for _ in range(num_runs):
_ = model(input_data)
if device.type == 'cuda':
torch.cuda.synchronize()
energy_metrics = self.stop_tracking()
# Extract profiler metrics
key_averages = prof.key_averages()
total_flops = sum(
item.flops for item in key_averages
if item.flops is not None and item.flops > 0
)
total_cuda_time = sum(
item.cuda_time_total for item in key_averages
)
return {
**energy_metrics,
'total_flops': total_flops,
'flops_per_inference': total_flops / num_runs,
'cuda_time_ms': total_cuda_time / 1000,
'latency_ms': (energy_metrics['duration_seconds'] * 1000) / num_runs,
'energy_per_inference_j': (energy_metrics['energy_kwh'] * 3600 * 1000) / num_runs,
'co2_per_inference_mg': energy_metrics['emissions_g_co2'] * 1000 / num_runs
}
def compare_models(
self,
models: Dict[str, torch.nn.Module],
input_data: torch.Tensor
) -> Dict[str, Dict]:
"""Compare energy efficiency of multiple models."""
results = {}
for name, model in models.items():
print(f"Profiling {name}...")
results[name] = self.profile_inference(model, input_data)
# Compute relative efficiency
baseline = list(results.values())[0]
for name, metrics in results.items():
metrics['relative_energy'] = metrics['energy_per_inference_j'] / baseline['energy_per_inference_j']
metrics['relative_latency'] = metrics['latency_ms'] / baseline['latency_ms']
return results
class CarbonAwareScheduler:
"""Carbon-aware job scheduling."""
def __init__(self, region: str = "DE"): # Germany has good API coverage
self.region = region
self.api_url = "https://api.electricitymap.org/v3"
def get_carbon_intensity(self) -> float:
"""Get current carbon intensity (gCO2/kWh)."""
import requests
try:
response = requests.get(
f"{self.api_url}/carbon-intensity/latest",
params={"zone": self.region},
headers={"auth-token": "YOUR_API_KEY"}
)
return response.json().get('carbonIntensity', 500)
except:
return 500 # Default fallback
def should_run_now(
self,
threshold_gco2_kwh: float = 200,
job_urgency: str = "low" # low, medium, high
) -> bool:
"""Decide if job should run now based on carbon intensity."""
current_intensity = self.get_carbon_intensity()
if job_urgency == "high":
return True # Always run urgent jobs
if job_urgency == "medium":
threshold_gco2_kwh *= 1.5 # More lenient
return current_intensity < threshold_gco2_kwh
def find_green_window(
self,
hours_ahead: int = 24
) -> Dict[str, Any]:
"""Find best time window with lowest carbon intensity."""
import requests
from datetime import datetime, timedelta
try:
response = requests.get(
f"{self.api_url}/carbon-intensity/forecast",
params={"zone": self.region},
headers={"auth-token": "YOUR_API_KEY"}
)
forecast = response.json().get('forecast', [])
if not forecast:
return {'best_time': datetime.now(), 'intensity': 500}
# Find minimum intensity
best = min(forecast[:hours_ahead], key=lambda x: x['carbonIntensity'])
return {
'best_time': best['datetime'],
'intensity': best['carbonIntensity'],
'current_intensity': forecast[0]['carbonIntensity'],
'potential_savings': 1 - (best['carbonIntensity'] / forecast[0]['carbonIntensity'])
}
except:
return {'best_time': datetime.now(), 'intensity': 500}
class AdaptiveInference:
"""Adaptive compute для energy optimization."""
def __init__(
self,
models: Dict[str, torch.nn.Module], # {'tiny': ..., 'small': ..., 'large': ...}
complexity_estimator: torch.nn.Module = None
):
self.models = models
self.complexity_estimator = complexity_estimator
# Energy per inference (Joules) - calibrated
self.energy_per_model = {
'tiny': 0.001,
'small': 0.01,
'large': 0.1
}
def estimate_complexity(self, x: torch.Tensor) -> float:
"""Estimate input complexity (0-1)."""
if self.complexity_estimator is not None:
with torch.no_grad():
return self.complexity_estimator(x).item()
else:
# Heuristic: image entropy
return torch.std(x).item() / 0.5 # Normalized by typical std
def select_model(
self,
x: torch.Tensor,
energy_budget: float = None,
quality_threshold: float = 0.9
) -> str:
"""Select optimal model based on input and constraints."""
complexity = self.estimate_complexity(x)
if energy_budget is not None:
# Select best model within budget
for name in ['large', 'small', 'tiny']:
if self.energy_per_model[name] <= energy_budget:
return name
return 'tiny'
# Select based on complexity
if complexity < 0.3:
return 'tiny'
elif complexity < 0.7:
return 'small'
else:
return 'large'
def infer(
self,
x: torch.Tensor,
**kwargs
) -> Tuple[torch.Tensor, Dict]:
"""Run inference with optimal model."""
model_name = self.select_model(x, **kwargs)
model = self.models[model_name]
with torch.no_grad():
output = model(x)
return output, {
'model_used': model_name,
'estimated_energy_j': self.energy_per_model[model_name],
'complexity': self.estimate_complexity(x)
}
Hardware для Sustainable AI
| Platform | TOPS | Power (W) | TOPS/W | Use Case |
|----------|------|-----------|--------|----------|
| NVIDIA H100 | 1,979 | 700 | 2.8 | Training |
| Apple M3 Max NPU | 18 | 15 | 1.2 | Edge |
| Google TPU v5e | 197 | 170 | 1.2 | Inference |
| Qualcomm Hexagon | 75 | 15 | 5.0 | Mobile |
| Groq LPU | 750 | 300 | 2.5 | Inference |
| Cerebras CS-2 | 850 | 20,000 | 0.04 | Training |
Edge vs Cloud порівняння:
Cloud A100 inference:
400W × 50ms = 20 Ws = 0.0056 Wh per query
Edge Hexagon inference:
15W × 100ms = 1.5 Ws = 0.0004 Wh per query
Edge is 14x more energy efficient per query!
But needs model optimization (quantization, pruning)
Reporting та Compliance
from dataclasses import dataclass, asdict
from datetime import datetime
import json
@dataclass
class MLCarbonReport:
"""Carbon footprint report для ML project."""
project_name: str
report_date: str
total_emissions_kg: float
energy_consumed_kwh: float
compute_hours: float
gpu_type: str
region: str
carbon_intensity_avg: float
# Breakdown
training_emissions_kg: float
inference_emissions_kg: float
# Comparison
equivalent_car_km: float
equivalent_flights_hours: float
equivalent_smartphone_charges: float
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2)
@classmethod
def compute_equivalents(cls, emissions_kg: float) -> dict:
"""Compute human-understandable equivalents."""
return {
'car_km': emissions_kg / 0.12, # 120g CO2 per km
'flight_hours': emissions_kg / 90, # 90kg CO2 per hour
'smartphone_charges': emissions_kg / 0.008, # 8g CO2 per charge
'trees_year': emissions_kg / 21, # 21kg CO2 absorbed per tree per year
}
def generate_sustainability_report(
project_name: str,
training_metrics: dict,
inference_metrics: dict,
region: str = "DE"
) -> MLCarbonReport:
"""Generate comprehensive sustainability report."""
total_emissions = (
training_metrics.get('emissions_kg', 0) +
inference_metrics.get('emissions_kg', 0)
)
equivalents = MLCarbonReport.compute_equivalents(total_emissions)
return MLCarbonReport(
project_name=project_name,
report_date=datetime.now().isoformat(),
total_emissions_kg=total_emissions,
energy_consumed_kwh=training_metrics.get('energy_kwh', 0) + inference_metrics.get('energy_kwh', 0),
compute_hours=training_metrics.get('hours', 0),
gpu_type=training_metrics.get('gpu_type', 'Unknown'),
region=region,
carbon_intensity_avg=training_metrics.get('carbon_intensity', 500),
training_emissions_kg=training_metrics.get('emissions_kg', 0),
inference_emissions_kg=inference_metrics.get('emissions_kg', 0),
equivalent_car_km=equivalents['car_km'],
equivalent_flights_hours=equivalents['flight_hours'],
equivalent_smartphone_charges=equivalents['smartphone_charges']
)
Ідеї для наукових досліджень
Для бакалаврської роботи:
- Порівняння energy efficiency різних quantization levels (INT8, INT4, INT2)
- Carbon footprint measurement для типового inference pipeline
- Edge vs Cloud energy comparison для конкретної задачі
Для магістерської дисертації:
- Adaptive compute policies для energy optimization
- Green federated learning з carbon-aware scheduling
- Efficient knowledge distillation для edge deployment
Для PhD досліджень:
- Theoretical bounds на trade-off compute-energy-accuracy
- Novel energy-efficient architectures
- System-level optimization across hardware-software stack
AI regulation приходить. EU AI Act. California bills. Carbon reporting requirements вже обов'язкові для великих компаній.
Sustainable AI — не activism. Це compliance, cost reduction, competitive advantage. GPT-4 query коштує Microsoft $0.01 в електриці. Мільярд queries на день = $10M/рік тільки на енергію. Оптимізація на 10% = $1M економії.
Розробник, який вміє писати energy-efficient ML — дорожчий на ринку. Бо це рідкісний skill з growing demand. Якщо ви плануєте дослідження в галузі sustainable computing, green AI чи energy-efficient ML — фахівці SKP-Degree допоможуть з формулюванням теми, проведенням експериментів та оформленням роботи. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy — від ідеї до успішного захисту.
Ключові слова: sustainable AI, green AI, carbon footprint, energy efficiency, edge computing, quantization, knowledge distillation, CodeCarbon, mixture of experts, early exit, дипломна робота, магістерська, AI-дослідження.