Три гіганти. Три протоколи. Одна мета — змусити AI-агентів розмовляти між собою.
Anthropic випустив MCP. Google відповів A2A. IBM не відстає з ACP. І тепер кожен, хто будує мультиагентні системи, стоїть перед вибором: який протокол обрати?
Це не просто технічне питання. Це питання архітектури, масштабування, і — давай чесно — політики. Бо хто контролює протокол, той контролює екосистему. Як TCP/IP визначив інтернет, так ці протоколи визначать майбутнє AI-інфраструктури.
Чому міжагентна комунікація — це складно
Коли в тебе один агент — все просто. Input → Processing → Output. Як виклик функції.
Коли агентів кілька — починається розподілений хаос:
Проблема синхронізації:
- Як агент A передає результат агенту B?
- В якому форматі? JSON? Protobuf? Власний бінарний?
- Як B розуміє, що A закінчив?
- Що робити, якщо A впав посеред роботи?
- Як синхронізувати state між агентами?
Проблема discovery:
- Як агент знаходить інших агентів?
- Як дізнатися, які capabilities має кожен?
- Як балансувати навантаження?
Проблема security:
- Як автентифікувати агентів?
- Як запобігти man-in-the-middle атакам?
- Як забезпечити конфіденційність даних?
Протоколи комунікації — це стандарти, які вирішують ці питання. І від вибору протоколу залежить, чи буде твоя система працювати надійно, чи розвалиться під навантаженням.
MCP (Model Context Protocol) — Anthropic
Філософія: Стандартизований спосіб надання контексту моделям. MCP — це не просто протокол передачі повідомлень, це спосіб інтеграції зовнішніх інструментів і даних з LLM.
Архітектура MCP:
┌─────────────────────────────────────────────────────────┐
│ MCP HOST │
│ (Claude Desktop, IDE, Custom Application) │
└─────────────────────────┬───────────────────────────────┘
│ JSON-RPC over stdio/HTTP
▼
┌─────────────────────────────────────────────────────────┐
│ MCP SERVER │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Tools │ │ Resources │ │ Prompts │ │
│ │ (actions) │ │ (data) │ │ (templates) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
Ключові компоненти:
- Tools — функції, які агент може викликати:
from mcp.server import Server
from mcp.types import Tool, TextContent
server = Server("database-agent")
@server.tool()
async def search_database(query: str, limit: int = 10) -> list[TextContent]:
"""Пошук у базі даних за запитом."""
results = await db.search(query, limit=limit)
return [TextContent(type="text", text=json.dumps(results))]
@server.tool()
async def insert_record(table: str, data: dict) -> TextContent:
"""Вставка запису в таблицю."""
record_id = await db.insert(table, data)
return TextContent(type="text", text=f"Inserted record {record_id}")
- Resources — доступ до даних:
@server.resource("db://schema/{table_name}")
async def get_table_schema(table_name: str) -> str:
"""Повертає схему таблиці."""
schema = await db.get_schema(table_name)
return json.dumps(schema, indent=2)
@server.resource("db://stats")
async def get_database_stats() -> str:
"""Статистика бази даних."""
stats = await db.get_statistics()
return json.dumps(stats)
- Prompts — шаблони для типових задач:
@server.prompt()
async def sql_expert_prompt(context: str) -> str:
"""Промпт для SQL-експерта."""
return f"""You are a SQL expert. Given the following context:
{context}
Generate efficient, secure SQL queries. Always use parameterized queries.
Explain your reasoning step by step."""
Повний приклад MCP сервера:
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, Resource, TextContent
class DatabaseMCPServer:
def __init__(self, connection_string: str):
self.server = Server("database-mcp")
self.db = DatabaseConnection(connection_string)
self._register_handlers()
def _register_handlers(self):
@self.server.tool()
async def execute_query(sql: str, params: list = None) -> TextContent:
"""Виконує SQL запит з параметрами."""
try:
result = await self.db.execute(sql, params or [])
return TextContent(
type="text",
text=json.dumps({
"status": "success",
"rows": result.rows,
"affected": result.rowcount
})
)
except Exception as e:
return TextContent(
type="text",
text=json.dumps({
"status": "error",
"message": str(e)
})
)
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_query",
description="Execute SQL query with parameters",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string"},
"params": {"type": "array", "items": {}}
},
"required": ["sql"]
}
)
]
async def run(self):
async with stdio_server() as (read_stream, write_stream):
await self.server.run(read_stream, write_stream)
if __name__ == "__main__":
server = DatabaseMCPServer("postgresql://localhost/mydb")
asyncio.run(server.run())
Сильні сторони MCP:
- Нативна інтеграція з Claude
- Простий JSON-RPC протокол
- Відмінна документація та приклади
- Активна open source спільнота
- Легко розширювати
Слабкі сторони:
- Оптимізований під Anthropic ecosystem
- Менша гнучкість для P2P комунікації
- Не найкраща продуктивність для high-throughput сценаріїв
A2A (Agent-to-Agent Protocol) — Google
Філософія: Децентралізована комунікація між автономними агентами на enterprise-масштабі. A2A проектувався для Google Cloud, де тисячі агентів повинні спілкуватися з мінімальною latency.
Архітектура A2A:
┌─────────────────────────────────────────────────────────┐
│ AGENT REGISTRY │
│ (Discovery, Agent Cards, Capabilities) │
└────────────────────────┬────────────────────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │gRPC │◄─┼──┼─►│gRPC │◄─┼──┼─►│gRPC │ │
│ │Server │ │ │ │Server │ │ │ │Server │ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
│ Agent Card │ │ Agent Card │ │ Agent Card │
└──────────────┘ └──────────────┘ └──────────────┘
Agent Card — ідентичність агента:
{
"agent_id": "research-agent-001",
"name": "Research Assistant",
"version": "2.1.0",
"capabilities": [
{
"name": "web_search",
"input_schema": {"query": "string", "max_results": "int"},
"output_schema": {"results": "array"}
},
{
"name": "summarize_document",
"input_schema": {"document_url": "string"},
"output_schema": {"summary": "string"}
}
],
"endpoint": "grpc://research-agent.example.com:443",
"authentication": {
"type": "mTLS",
"cert_fingerprint": "sha256:abc123..."
},
"rate_limits": {
"requests_per_minute": 1000,
"concurrent_connections": 100
}
}
gRPC Service Definition:
syntax = "proto3";
package a2a.v1;
service AgentService {
// Синхронний виклик capability
rpc InvokeCapability(CapabilityRequest) returns (CapabilityResponse);
// Стрімінг для довгих операцій
rpc StreamCapability(CapabilityRequest) returns (stream CapabilityChunk);
// Двонаправлений стрім для діалогу
rpc Conversation(stream Message) returns (stream Message);
// Health check
rpc HealthCheck(HealthRequest) returns (HealthResponse);
}
message CapabilityRequest {
string capability_name = 1;
bytes input_data = 2; // JSON-encoded
map<string, string> metadata = 3;
string trace_id = 4; // Для distributed tracing
}
message CapabilityResponse {
bool success = 1;
bytes output_data = 2;
string error_message = 3;
int64 processing_time_ms = 4;
}
message CapabilityChunk {
bytes data = 1;
bool is_final = 2;
float progress = 3; // 0.0 - 1.0
}
Python клієнт для A2A:
import grpc
from a2a.v1 import agent_pb2, agent_pb2_grpc
from google.auth.transport.grpc import secure_authorized_channel
class A2AClient:
def __init__(self, agent_card: dict):
self.agent_card = agent_card
self.channel = self._create_secure_channel()
self.stub = agent_pb2_grpc.AgentServiceStub(self.channel)
def _create_secure_channel(self):
"""Створює mTLS-захищений канал."""
credentials = grpc.ssl_channel_credentials(
root_certificates=load_ca_cert(),
private_key=load_client_key(),
certificate_chain=load_client_cert()
)
return grpc.secure_channel(
self.agent_card["endpoint"].replace("grpc://", ""),
credentials
)
async def invoke(self, capability: str, input_data: dict,
timeout: float = 30.0) -> dict:
"""Викликає capability агента."""
request = agent_pb2.CapabilityRequest(
capability_name=capability,
input_data=json.dumps(input_data).encode(),
trace_id=generate_trace_id()
)
response = await self.stub.InvokeCapability(
request,
timeout=timeout
)
if not response.success:
raise A2AError(response.error_message)
return json.loads(response.output_data)
async def stream_invoke(self, capability: str, input_data: dict):
"""Стрімінг виклик для довгих операцій."""
request = agent_pb2.CapabilityRequest(
capability_name=capability,
input_data=json.dumps(input_data).encode()
)
async for chunk in self.stub.StreamCapability(request):
yield {
"data": json.loads(chunk.data) if chunk.data else None,
"progress": chunk.progress,
"is_final": chunk.is_final
}
# Використання
async def research_with_a2a():
# Отримуємо Agent Card з registry
registry = A2ARegistry("https://registry.example.com")
agent_card = await registry.find_agent(capability="web_search")
client = A2AClient(agent_card)
# Стрімінг пошук
async for chunk in client.stream_invoke(
"web_search",
{"query": "latest AI research", "max_results": 20}
):
print(f"Progress: {chunk['progress']*100:.1f}%")
if chunk["is_final"]:
return chunk["data"]
Сильні сторони A2A:
- Висока продуктивність завдяки gRPC/Protobuf
- Вбудований mTLS для enterprise security
- Agent Cards для автоматичного discovery
- Стрімінг з коробки
- Distributed tracing підтримка
Слабкі сторони:
- Складніший setup порівняно з MCP
- Вимагає більше інфраструктури (registry, certificates)
- Тісна інтеграція з Google Cloud ecosystem
ACP (Agent Communication Protocol) — IBM
Філософія: Enterprise-grade протокол для критичних систем з вимогами compliance. ACP розроблявся для банків, страхових компаній, медичних установ — там, де кожна транзакція має бути задокументована і аудитована.
Архітектура ACP:
┌─────────────────────────────────────────────────────────┐
│ COMPLIANCE GATEWAY │
│ (Audit, Policy Enforcement, Data Classification) │
└────────────────────────┬────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────┐
│ MESSAGE BROKER (encrypted) │
│ (IBM MQ / Apache Kafka with encryption) │
└────────────────────────┬────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │Policy │ │ │ │Policy │ │ │ │Policy │ │
│ │Enforcer │ │ │ │Enforcer │ │ │ │Enforcer │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
│ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │
│ │Audit Log │ │ │ │Audit Log │ │ │ │Audit Log │ │
│ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
ACP Message Format:
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import hashlib
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted" # PII, PHI, финансові дані
@dataclass
class ACPMessage:
message_id: str
sender_id: str
recipient_id: str
timestamp: datetime
# Payload
action: str
payload: dict
# Compliance fields
data_classification: DataClassification
retention_period_days: int
consent_reference: str # Посилання на згоду користувача
# Audit trail
parent_message_id: str = None # Для ланцюжка повідомлень
signature: str = None # Криптографічний підпис
def sign(self, private_key):
"""Підписує повідомлення."""
content = f"{self.message_id}:{self.sender_id}:{self.payload}"
self.signature = sign_with_key(content, private_key)
def verify(self, public_key) -> bool:
"""Верифікує підпис."""
content = f"{self.message_id}:{self.sender_id}:{self.payload}"
return verify_signature(content, self.signature, public_key)
def get_audit_record(self) -> dict:
"""Генерує запис для аудиту."""
return {
"message_id": self.message_id,
"sender": self.sender_id,
"recipient": self.recipient_id,
"timestamp": self.timestamp.isoformat(),
"action": self.action,
"classification": self.data_classification.value,
"payload_hash": hashlib.sha256(
json.dumps(self.payload).encode()
).hexdigest(),
"signature_valid": self.signature is not None
}
ACP Agent Implementation:
class ACPAgent:
def __init__(self, agent_id: str, config: ACPConfig):
self.agent_id = agent_id
self.config = config
self.policy_enforcer = PolicyEnforcer(config.policies)
self.audit_logger = AuditLogger(config.audit_endpoint)
self.message_queue = IBMMessageQueue(config.mq_connection)
async def send_message(self, recipient: str, action: str,
payload: dict, classification: DataClassification):
"""Відправляє повідомлення з compliance перевірками."""
# 1. Policy check
policy_result = await self.policy_enforcer.check(
sender=self.agent_id,
recipient=recipient,
action=action,
classification=classification
)
if not policy_result.allowed:
await self.audit_logger.log_violation(
action=action,
reason=policy_result.reason
)
raise PolicyViolationError(policy_result.reason)
# 2. Data masking для sensitive fields
masked_payload = self._mask_sensitive_data(payload, classification)
# 3. Create message
message = ACPMessage(
message_id=generate_uuid(),
sender_id=self.agent_id,
recipient_id=recipient,
timestamp=datetime.utcnow(),
action=action,
payload=masked_payload,
data_classification=classification,
retention_period_days=self._get_retention(classification),
consent_reference=self.config.consent_id
)
# 4. Sign message
message.sign(self.config.private_key)
# 5. Audit log
await self.audit_logger.log(message.get_audit_record())
# 6. Send via encrypted channel
await self.message_queue.send(
queue=f"acp.{recipient}",
message=message.serialize(),
encryption="AES-256-GCM"
)
return message.message_id
async def receive_message(self) -> ACPMessage:
"""Отримує та верифікує повідомлення."""
raw = await self.message_queue.receive(f"acp.{self.agent_id}")
message = ACPMessage.deserialize(raw)
# Verify signature
sender_key = await self.get_public_key(message.sender_id)
if not message.verify(sender_key):
await self.audit_logger.log_security_event(
"invalid_signature",
message.message_id
)
raise SecurityError("Invalid message signature")
# Audit log receipt
await self.audit_logger.log({
"event": "message_received",
**message.get_audit_record()
})
return message
def _mask_sensitive_data(self, payload: dict,
classification: DataClassification) -> dict:
"""Маскує PII та інші sensitive дані."""
if classification in [DataClassification.CONFIDENTIAL,
DataClassification.RESTRICTED]:
return self._apply_masking_rules(payload)
return payload
Сильні сторони ACP:
- Повна відповідність HIPAA, SOC2, GDPR, PCI-DSS
- Криптографічний audit trail
- Вбудована класифікація даних
- Підтримка legacy IBM систем
- Формальна верифікація протоколу
Слабкі сторони:
- Значний overhead для простих use cases
- Дорога IBM інфраструктура
- Менша спільнота та ecosystem
- Повільніший development cycle
Порівняльна таблиця
| Критерій | MCP | A2A | ACP |
|----------|-----|-----|-----|
| Швидкість | ⭐⭐⭐ (JSON-RPC) | ⭐⭐⭐⭐⭐ (gRPC) | ⭐⭐⭐ (overhead) |
| Простота | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| Security | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Compliance | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Ecosystem | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| Open Source | ✅ Повністю | Частково | ❌ Пропрієтарний |
| Найкраще для | Стартапи, інтеграції | Enterprise scale | Regulated industries |
Interoperability: Bridge між протоколами
Реальність така, що в enterprise середовищі часто потрібно підтримувати кілька протоколів. Ось приклад bridge-сервісу:
class ProtocolBridge:
"""Bridge для конвертації між MCP, A2A та ACP."""
def __init__(self):
self.mcp_server = MCPServer()
self.a2a_client = A2AClient()
self.acp_agent = ACPAgent()
async def mcp_to_a2a(self, mcp_request: dict) -> dict:
"""Конвертує MCP tool call в A2A capability invocation."""
return {
"capability_name": mcp_request["params"]["name"],
"input_data": mcp_request["params"]["arguments"],
"metadata": {
"source_protocol": "mcp",
"original_method": mcp_request["method"]
}
}
async def a2a_to_acp(self, a2a_response: dict,
classification: DataClassification) -> ACPMessage:
"""Обгортає A2A response в ACP message з compliance."""
return ACPMessage(
message_id=generate_uuid(),
sender_id="bridge-service",
recipient_id=a2a_response.get("recipient", "default"),
timestamp=datetime.utcnow(),
action="bridged_response",
payload=a2a_response,
data_classification=classification,
retention_period_days=90
)
async def handle_cross_protocol_request(self, source_protocol: str,
target_protocol: str,
request: dict) -> dict:
"""Маршрутизує запити між протоколами."""
converters = {
("mcp", "a2a"): self.mcp_to_a2a,
("a2a", "acp"): self.a2a_to_acp,
# ... інші комбінації
}
converter = converters.get((source_protocol, target_protocol))
if not converter:
raise UnsupportedConversionError(
f"No converter for {source_protocol} -> {target_protocol}"
)
return await converter(request)
Performance Benchmarking
Для наукової роботи важливо коректно порівнювати продуктивність:
import asyncio
import time
from dataclasses import dataclass
from statistics import mean, stdev
@dataclass
class BenchmarkResult:
protocol: str
operation: str
latency_ms: list[float]
throughput_rps: float
error_rate: float
@property
def p50_latency(self) -> float:
return sorted(self.latency_ms)[len(self.latency_ms)//2]
@property
def p99_latency(self) -> float:
return sorted(self.latency_ms)[int(len(self.latency_ms)*0.99)]
async def benchmark_protocol(protocol_client, num_requests: int = 1000,
concurrency: int = 10) -> BenchmarkResult:
"""Бенчмарк протоколу."""
latencies = []
errors = 0
semaphore = asyncio.Semaphore(concurrency)
async def single_request():
nonlocal errors
async with semaphore:
start = time.perf_counter()
try:
await protocol_client.ping() # Стандартна операція
except Exception:
errors += 1
return None
return (time.perf_counter() - start) * 1000
start_time = time.perf_counter()
results = await asyncio.gather(*[
single_request() for _ in range(num_requests)
])
total_time = time.perf_counter() - start_time
latencies = [r for r in results if r is not None]
return BenchmarkResult(
protocol=protocol_client.name,
operation="ping",
latency_ms=latencies,
throughput_rps=num_requests / total_time,
error_rate=errors / num_requests
)
# Запуск бенчмарків
async def run_comparison():
protocols = [
MCPClient("localhost:8080"),
A2AClient("localhost:8081"),
ACPClient("localhost:8082")
]
results = []
for client in protocols:
result = await benchmark_protocol(client)
results.append(result)
print(f"{result.protocol}:")
print(f" P50 Latency: {result.p50_latency:.2f}ms")
print(f" P99 Latency: {result.p99_latency:.2f}ms")
print(f" Throughput: {result.throughput_rps:.0f} req/s")
print(f" Error Rate: {result.error_rate*100:.2f}%")
Ідеї для дослідження
Для бакалавра:
- Реалізація простого multi-agent system на MCP з 3-5 агентами
- Порівняння latency MCP vs A2A на синтетичних тестах (1000+ запитів)
- Створення візуалізації комунікації між агентами
- Імплементація базового service discovery для MCP
Для магістра:
- Розробка production-ready bridge між MCP та A2A
- Аналіз security vulnerabilities кожного протоколу (penetration testing)
- Оптимізація протоколу для edge devices з обмеженими ресурсами
- Дослідження scalability: як протоколи поводяться при 100+ агентів
- Формальна специфікація протоколу в TLA+ або Alloy
Для PhD:
- Формальна верифікація протоколів (model checking, theorem proving)
- Розробка нового протоколу з доведеними властивостями
- Теоретичний аналіз оптимальної комунікації (game theory, mechanism design)
- Cross-protocol consensus algorithms для гетерогенних систем
Якщо вас цікавить реалізація будь-якої з цих тем для курсової чи дипломної роботи, фахівці SKP-Degree допоможуть з повним циклом — від вибору теми до захисту. Звертайтесь на skp-degree.com.ua або пишіть у Telegram: @kursovi_diplomy.
Де брати матеріал
Офіційні джерела:
- MCP: modelcontextprotocol.io — повна документація та SDK
- A2A: Google AI blog, cloud.google.com/agent-builder
- ACP: IBM Developer, IBM Watson documentation
Код та приклади:
- github.com/anthropics/anthropic-cookbook — MCP examples
- github.com/modelcontextprotocol — офіційні SDK
- Google Cloud samples — A2A implementations
- IBM Watson SDK — ACP інтеграції
Наукові статті:
- arXiv: "agent communication protocols", "multi-agent systems"
- ACM Digital Library: distributed AI systems
- IEEE: autonomous agents communication
Складність: ? Бакалаврський+ рівень
Потрібні знання:
- Networking basics (HTTP, gRPC, WebSocket, message queues)
- Python (asyncio, dataclasses)
- Базове розуміння LLM та AI agents
- JSON, Protobuf serialization
Чому підходить для бакалавра:
- Практично-орієнтована тема з конкретними результатами
- Можна зробити якісний benchmarking без глибокої теорії
- Актуальність гарантує інтерес комісії
- Є готові open source інструменти для експериментів
Висновок: який протокол обрати?
Вибір протоколу — це не питання "який кращий", а "який підходить для вашого use case":
Обирайте MCP, якщо:
- Працюєте з Claude або Anthropic ecosystem
- Потрібна швидка інтеграція без overhead
- Стартап або невеликий проект
- Open source важливий
Обирайте A2A, якщо:
- Enterprise-масштаб з тисячами агентів
- Критична продуктивність (low latency)
- Вже використовуєте Google Cloud
- Потрібен автоматичний service discovery
Обирайте ACP, якщо:
- Регульована індустрія (фінанси, медицина)
- Вимоги compliance (HIPAA, SOC2, GDPR)
- Потрібен повний audit trail
- Вже є IBM інфраструктура
Протоколи — це інфраструктура. Хто контролює інфраструктуру, той контролює ринок. Розуміння цих протоколів зараз — це як розуміти HTTP у 1995. Ті, хто розберуться першими, матимуть перевагу на роки.
Ключові слова: MCP, Model Context Protocol, A2A, Agent-to-Agent, ACP, Agent Communication Protocol, протоколи міжагентної комунікації, мультиагентні системи, Anthropic, Google, IBM, gRPC, JSON-RPC, distributed AI, enterprise AI, compliance, дипломна робота, курсова робота, магістерська.