Skip to content

๐Ÿค Agent2Agent Protocol (A2A): Inter-Agent Communication

Label: Agents Collaborating with Agents

The Agent2Agent Protocol (A2A) is an open standard designed to enable direct communication and collaboration between AI agents. While MCP connects applications to models, A2A enables agents to discover, communicate with, and delegate tasks to other agents.

Without A2A: With A2A:
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Agent A โ”‚ โ”‚ Agent A โ”‚
โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚ A2A request
โ”‚ Manual โ–ผ
โ”‚ integration โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ–ผ โ”‚ A2A Registryโ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ Agent B โ”‚ โ”‚ discover
โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜ โ–ผ
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Different โ”‚ Agent B โ”‚โ—„โ”€โ”€โ”
โ”‚ protocols โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ–ผ โ”‚ โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ A2A โ”‚ A2A
โ”‚ Agent C โ”‚ โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ–ผ โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
Agents can't โ”‚ Agent C โ”‚โ”€โ”€โ”€โ”˜
collaborate โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
Agents collaborate
seamlessly
class A2ARegistry:
"""
Registry for agent discovery
"""
def __init__(self):
self.agents = {}
def register_agent(self, agent_id: str, capabilities: List[str],
endpoint: str, metadata: Dict):
"""
Register an agent with its capabilities
"""
self.agents[agent_id] = {
'capabilities': capabilities,
'endpoint': endpoint,
'metadata': metadata,
'status': 'available',
'registered_at': datetime.now()
}
def find_agents(self, capability: str) -> List[Dict]:
"""
Find agents with specific capability
"""
matching_agents = []
for agent_id, info in self.agents.items():
if capability in info['capabilities']:
matching_agents.append({
'id': agent_id,
'endpoint': info['endpoint'],
'metadata': info['metadata']
})
return matching_agents
def get_agent(self, agent_id: str) -> Dict:
"""
Get specific agent information
"""
return self.agents.get(agent_id)
# Usage
registry = A2ARegistry()
# Register agents
registry.register_agent(
agent_id="research_agent_1",
capabilities=["web_search", "document_analysis", "summarization"],
endpoint="https://agent1.example.com/a2a",
metadata={
"name": "Research Specialist",
"description": "Expert in research and information gathering"
}
)
registry.register_agent(
agent_id="writer_agent_1",
capabilities=["content_generation", "editing", "formatting"],
endpoint="https://agent2.example.com/a2a",
metadata={
"name": "Content Writer",
"description": "Expert in content creation"
}
)
# Find agents
researchers = registry.find_agents("web_search")
print(f"Found {len(researchers)} research agents")
class A2AClient:
"""
Client for A2A communication
"""
def __init__(self, agent_id: str, registry: A2ARegistry):
self.agent_id = agent_id
self.registry = registry
async def send_request(self, target_agent_id: str,
task: Dict) -> Dict:
"""
Send task request to another agent
"""
# Get target agent info
target = self.registry.get_agent(target_agent_id)
if not target:
return {'error': 'Agent not found'}
# Build A2A request
request = {
'protocol': 'A2A/1.0',
'from': self.agent_id,
'to': target_agent_id,
'task': task,
'timestamp': datetime.now().isoformat()
}
# Send to target agent
response = await self.send_to_endpoint(
target['endpoint'],
request
)
return response
async def send_to_endpoint(self, endpoint: str,
request: Dict) -> Dict:
"""
Send HTTP request to agent endpoint
"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{endpoint}/task",
json=request
)
return response.json()
async def delegate_task(self, capability: str,
task_description: str,
context: Dict = None) -> Dict:
"""
Find suitable agent and delegate task
"""
# Find agents with required capability
agents = self.registry.find_agents(capability)
if not agents:
return {
'status': 'error',
'error': f'No agents found with capability: {capability}'
}
# Select best agent (can be more sophisticated)
selected_agent = agents[0]
# Send task
response = await self.send_request(
target_agent_id=selected_agent['id'],
task={
'type': capability,
'description': task_description,
'context': context or {}
}
)
return response
# Usage
client = A2AClient("coordinator_agent", registry)
# Delegate research task
result = await client.delegate_task(
capability="web_search",
task_description="Find latest AI research papers on RAG",
context={"max_results": 10, "recency": "last_month"}
)
print(result)
class A2AAgent:
"""
Agent with A2A support
"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.task_handlers = {}
self.app = FastAPI()
self.setup_endpoints()
def setup_endpoints(self):
"""
Setup A2A endpoints
"""
@self.app.get("/a2a/capabilities")
async def get_capabilities():
return {
'agent_id': self.agent_id,
'capabilities': self.capabilities,
'protocol_version': '1.0'
}
@self.app.post("/a2a/task")
async def handle_task(request: Dict):
return await self.process_task(request)
async def process_task(self, request: Dict) -> Dict:
"""
Process incoming A2A task request
"""
task = request['task']
task_type = task['type']
# Verify capability
if task_type not in self.capabilities:
return {
'status': 'error',
'error': f'Capability {task_type} not supported'
}
# Execute task
if task_type in self.task_handlers:
try:
result = await self.task_handlers[task_type](task)
return {
'status': 'success',
'result': result,
'agent_id': self.agent_id
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
return {
'status': 'error',
'error': 'No handler for task type'
}
def register_task_handler(self, capability: str, handler: callable):
"""
Register handler for a capability
"""
self.task_handlers[capability] = handler
# Example: Research Agent
class ResearchAgent(A2AAgent):
"""
Specialized research agent with A2A support
"""
def __init__(self):
super().__init__(
agent_id="research_agent_1",
capabilities=["web_search", "document_analysis"]
)
# Register handlers
self.register_task_handler("web_search", self.handle_web_search)
self.register_task_handler("document_analysis", self.handle_analysis)
async def handle_web_search(self, task: Dict) -> Dict:
"""
Handle web search requests
"""
query = task['description']
context = task.get('context', {})
max_results = context.get('max_results', 10)
# Perform search
results = await self.search_web(query, max_results)
return {
'query': query,
'results': results,
'count': len(results)
}
async def handle_analysis(self, task: Dict) -> Dict:
"""
Handle document analysis requests
"""
document = task.get('context', {}).get('document')
# Analyze document
analysis = await self.analyze_document(document)
return {
'summary': analysis['summary'],
'key_points': analysis['key_points'],
'sentiment': analysis['sentiment']
}
class MultiAgentCoordinator:
"""
Coordinates multiple agents for complex tasks
"""
def __init__(self, registry: A2ARegistry):
self.registry = registry
self.client = A2AClient("coordinator", registry)
async def execute_workflow(self, workflow_spec: Dict) -> Dict:
"""
Execute multi-agent workflow
"""
results = {}
for step in workflow_spec['steps']:
step_id = step['id']
capability = step['capability']
depends_on = step.get('depends_on', [])
# Wait for dependencies
context = {}
for dep_id in depends_on:
if dep_id in results:
context[dep_id] = results[dep_id]
# Execute step
result = await self.client.delegate_task(
capability=capability,
task_description=step['description'],
context=context
)
results[step_id] = result
return results
# Example workflow
workflow = {
'steps': [
{
'id': 'research',
'capability': 'web_search',
'description': 'Research AI agent architectures',
'depends_on': []
},
{
'id': 'analyze',
'capability': 'document_analysis',
'description': 'Analyze research findings',
'depends_on': ['research']
},
{
'id': 'write',
'capability': 'content_generation',
'description': 'Write summary article',
'depends_on': ['research', 'analyze']
}
]
}
coordinator = MultiAgentCoordinator(registry)
results = await coordinator.execute_workflow(workflow)
from fastapi import FastAPI
import httpx
class SimpleA2AAgent:
"""
Minimal A2A-compatible agent
"""
def __init__(self, name: str, skill: str):
self.name = name
self.skill = skill
self.app = FastAPI()
@self.app.post("/task")
async def handle(task: dict):
if task['type'] == self.skill:
result = await self.execute(task['data'])
return {'status': 'success', 'result': result}
return {'status': 'error', 'error': 'Unsupported task'}
async def execute(self, data):
# Implement skill
return f"{self.name} processed: {data}"
async def delegate(self, agent_url: str, task_type: str, data: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{agent_url}/task",
json={'type': task_type, 'data': data}
)
return response.json()
# Create agents
researcher = SimpleA2AAgent("Researcher", "research")
writer = SimpleA2AAgent("Writer", "write")
# Researcher delegates to writer
result = await researcher.delegate(
"http://localhost:8001",
"write",
"Research findings about RAG systems"
)
class IntelligentA2ARouter:
"""
Smart routing for A2A requests
"""
def __init__(self, registry: A2ARegistry):
self.registry = registry
self.agent_metrics = {}
async def select_agent(self, capability: str,
requirements: Dict = None) -> str:
"""
Select best agent for task
"""
# Find capable agents
agents = self.registry.find_agents(capability)
if not agents:
return None
# Score each agent
scored_agents = []
for agent in agents:
score = await self.score_agent(agent, requirements or {})
scored_agents.append((score, agent))
# Select highest scoring
best_agent = max(scored_agents, key=lambda x: x[0])[1]
return best_agent['id']
async def score_agent(self, agent: Dict,
requirements: Dict) -> float:
"""
Score agent based on multiple factors
"""
score = 0.0
# Factor 1: Current load (prefer less busy agents)
load = await self.get_agent_load(agent['id'])
score += (1.0 - load) * 0.4
# Factor 2: Performance history
performance = self.agent_metrics.get(agent['id'], {}).get('success_rate', 0.5)
score += performance * 0.3
# Factor 3: Response time
avg_response = self.agent_metrics.get(agent['id'], {}).get('avg_response_ms', 1000)
score += (1.0 - min(avg_response / 2000, 1.0)) * 0.2
# Factor 4: Specialization match
if 'preferred_tasks' in requirements:
metadata = agent.get('metadata', {})
if requirements['preferred_tasks'] in metadata.get('specialties', []):
score += 0.1
return score
async def get_agent_load(self, agent_id: str) -> float:
"""
Get current agent load (0.0 = idle, 1.0 = full)
"""
# Query agent for current load
agent = self.registry.get_agent(agent_id)
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{agent['endpoint']}/metrics")
metrics = response.json()
return metrics.get('load', 0.5)
except:
return 0.5 # Default if unavailable
class PipelinePattern:
"""
Chain agents in a pipeline
"""
async def execute_pipeline(self, agents: List[str],
initial_data: Dict) -> Dict:
"""
Pass data through agent pipeline
"""
data = initial_data
for agent_id in agents:
response = await self.client.send_request(agent_id, {
'type': 'process',
'data': data
})
if response['status'] != 'success':
return response
# Output of one agent becomes input to next
data = response['result']
return {'status': 'success', 'result': data}
# Usage
pipeline = PipelinePattern()
result = await pipeline.execute_pipeline(
agents=['data_collector', 'data_processor', 'report_generator'],
initial_data={'query': 'Q4 sales'}
)
class CompetitivePattern:
"""
Multiple agents compete, best result wins
"""
async def compete(self, agents: List[str], task: Dict) -> Dict:
"""
Send task to multiple agents, return best result
"""
# Send to all agents in parallel
responses = await asyncio.gather(*[
self.client.send_request(agent_id, task)
for agent_id in agents
])
# Score responses
scored = []
for response in responses:
if response['status'] == 'success':
score = await self.score_response(response['result'])
scored.append((score, response))
# Return best
if scored:
best = max(scored, key=lambda x: x[0])[1]
return best
return {'status': 'error', 'error': 'All agents failed'}
class ConsensusPattern:
"""
Multiple agents vote on result
"""
async def reach_consensus(self, agents: List[str],
task: Dict,
threshold: float = 0.6) -> Dict:
"""
Agents vote, require threshold agreement
"""
# Get responses from all agents
responses = await asyncio.gather(*[
self.client.send_request(agent_id, task)
for agent_id in agents
])
# Collect results
results = [r['result'] for r in responses if r['status'] == 'success']
# Find consensus
consensus = self.find_most_common(results)
agreement = results.count(consensus) / len(results)
if agreement >= threshold:
return {
'status': 'success',
'result': consensus,
'consensus_level': agreement
}
return {
'status': 'no_consensus',
'results': results,
'agreement': agreement
}
  • A2A enables agent collaboration through standardized protocol
  • Discovery allows agents to find suitable collaborators
  • Delegation distributes work across specialist agents
  • Coordination enables complex multi-agent workflows
  • Patterns like pipeline, competitive, and consensus solve common problems
# โœ… Good: Specific capabilities
capabilities = ["web_search", "pdf_parsing", "entity_extraction"]
# โŒ Bad: Vague capabilities
capabilities = ["research", "analysis"]
# โœ… Good: Well-defined task
{
'type': 'web_search',
'description': 'Find research papers on GraphRAG from 2024',
'context': {
'max_results': 10,
'date_range': '2024-01-01,2024-12-31',
'sources': ['arxiv', 'acm']
}
}
# โŒ Bad: Ambiguous task
{
'type': 'search',
'data': 'GraphRAG papers'
}
# โœ… Good: Handle failures gracefully
async def delegate_with_fallback(self, task):
try:
return await self.primary_agent.execute(task)
except Exception:
return await self.fallback_agent.execute(task)
# โŒ Bad: No error handling
async def delegate(self, task):
return await self.agent.execute(task)

Continue to Tool Ecosystems to learn about integrating with existing frameworks like LangChain and CrewAI.


๐Ÿ’ก Remember: A2A transforms isolated agents into a collaborative team where specialized agents work together to solve complex problems.