๐ค Agent2Agent Protocol (A2A): Inter-Agent Communication
๐ฟ Growing Concept
Section titled โ๐ฟ Growing ConceptโLabel: Agents Collaborating with Agents
The Agent2Agent Protocol (A2A) is an open standard designed to enable direct communication and collaboration between AI agents. While MCP connects applications to models, A2A enables agents to discover, communicate with, and delegate tasks to other agents.
What is A2A?
Section titled โWhat is A2A?โThe Multi-Agent Challenge
Section titled โThe Multi-Agent ChallengeโWithout A2A: With A2A:โโโโโโโโโโโ โโโโโโโโโโโโ Agent A โ โ Agent A โโโโโโโฌโโโโโ โโโโโโฌโโโโโ โ โ A2A request โ Manual โผ โ integration โโโโโโโโโโโโโโโ โผ โ A2A Registryโโโโโโโโโโโโ โโโโโโโโฌโโโโโโโโ Agent B โ โ discoverโโโโโโฌโโโโโ โผ โ โโโโโโโโโโโ โ Different โ Agent B โโโโโ โ protocols โโโโโโโโโโโ โ โผ โ โโโโโโโโโโโโ โ A2A โ A2Aโ Agent C โ โ โโโโโโโโโโโโ โผ โ โโโโโโโโโโโ โAgents can't โ Agent C โโโโโcollaborate โโโโโโโโโโโ
Agents collaborate seamlessly๐ก A2A Core Concepts
Section titled โ๐ก A2A Core Conceptsโ1. Agent Discovery
Section titled โ1. Agent Discoveryโclass A2ARegistry: """ Registry for agent discovery """ def __init__(self): self.agents = {}
def register_agent(self, agent_id: str, capabilities: List[str], endpoint: str, metadata: Dict): """ Register an agent with its capabilities """ self.agents[agent_id] = { 'capabilities': capabilities, 'endpoint': endpoint, 'metadata': metadata, 'status': 'available', 'registered_at': datetime.now() }
def find_agents(self, capability: str) -> List[Dict]: """ Find agents with specific capability """ matching_agents = []
for agent_id, info in self.agents.items(): if capability in info['capabilities']: matching_agents.append({ 'id': agent_id, 'endpoint': info['endpoint'], 'metadata': info['metadata'] })
return matching_agents
def get_agent(self, agent_id: str) -> Dict: """ Get specific agent information """ return self.agents.get(agent_id)
# Usageregistry = A2ARegistry()
# Register agentsregistry.register_agent( agent_id="research_agent_1", capabilities=["web_search", "document_analysis", "summarization"], endpoint="https://agent1.example.com/a2a", metadata={ "name": "Research Specialist", "description": "Expert in research and information gathering" })
registry.register_agent( agent_id="writer_agent_1", capabilities=["content_generation", "editing", "formatting"], endpoint="https://agent2.example.com/a2a", metadata={ "name": "Content Writer", "description": "Expert in content creation" })
# Find agentsresearchers = registry.find_agents("web_search")print(f"Found {len(researchers)} research agents")2. Agent Communication
Section titled โ2. Agent Communicationโclass A2AClient: """ Client for A2A communication """ def __init__(self, agent_id: str, registry: A2ARegistry): self.agent_id = agent_id self.registry = registry
async def send_request(self, target_agent_id: str, task: Dict) -> Dict: """ Send task request to another agent """ # Get target agent info target = self.registry.get_agent(target_agent_id)
if not target: return {'error': 'Agent not found'}
# Build A2A request request = { 'protocol': 'A2A/1.0', 'from': self.agent_id, 'to': target_agent_id, 'task': task, 'timestamp': datetime.now().isoformat() }
# Send to target agent response = await self.send_to_endpoint( target['endpoint'], request )
return response
async def send_to_endpoint(self, endpoint: str, request: Dict) -> Dict: """ Send HTTP request to agent endpoint """ async with httpx.AsyncClient() as client: response = await client.post( f"{endpoint}/task", json=request ) return response.json()
async def delegate_task(self, capability: str, task_description: str, context: Dict = None) -> Dict: """ Find suitable agent and delegate task """ # Find agents with required capability agents = self.registry.find_agents(capability)
if not agents: return { 'status': 'error', 'error': f'No agents found with capability: {capability}' }
# Select best agent (can be more sophisticated) selected_agent = agents[0]
# Send task response = await self.send_request( target_agent_id=selected_agent['id'], task={ 'type': capability, 'description': task_description, 'context': context or {} } )
return response
# Usageclient = A2AClient("coordinator_agent", registry)
# Delegate research taskresult = await client.delegate_task( capability="web_search", task_description="Find latest AI research papers on RAG", context={"max_results": 10, "recency": "last_month"})
print(result)3. Task Execution
Section titled โ3. Task Executionโclass A2AAgent: """ Agent with A2A support """ def __init__(self, agent_id: str, capabilities: List[str]): self.agent_id = agent_id self.capabilities = capabilities self.task_handlers = {} self.app = FastAPI()
self.setup_endpoints()
def setup_endpoints(self): """ Setup A2A endpoints """ @self.app.get("/a2a/capabilities") async def get_capabilities(): return { 'agent_id': self.agent_id, 'capabilities': self.capabilities, 'protocol_version': '1.0' }
@self.app.post("/a2a/task") async def handle_task(request: Dict): return await self.process_task(request)
async def process_task(self, request: Dict) -> Dict: """ Process incoming A2A task request """ task = request['task'] task_type = task['type']
# Verify capability if task_type not in self.capabilities: return { 'status': 'error', 'error': f'Capability {task_type} not supported' }
# Execute task if task_type in self.task_handlers: try: result = await self.task_handlers[task_type](task) return { 'status': 'success', 'result': result, 'agent_id': self.agent_id } except Exception as e: return { 'status': 'error', 'error': str(e) }
return { 'status': 'error', 'error': 'No handler for task type' }
def register_task_handler(self, capability: str, handler: callable): """ Register handler for a capability """ self.task_handlers[capability] = handler
# Example: Research Agentclass ResearchAgent(A2AAgent): """ Specialized research agent with A2A support """ def __init__(self): super().__init__( agent_id="research_agent_1", capabilities=["web_search", "document_analysis"] )
# Register handlers self.register_task_handler("web_search", self.handle_web_search) self.register_task_handler("document_analysis", self.handle_analysis)
async def handle_web_search(self, task: Dict) -> Dict: """ Handle web search requests """ query = task['description'] context = task.get('context', {}) max_results = context.get('max_results', 10)
# Perform search results = await self.search_web(query, max_results)
return { 'query': query, 'results': results, 'count': len(results) }
async def handle_analysis(self, task: Dict) -> Dict: """ Handle document analysis requests """ document = task.get('context', {}).get('document')
# Analyze document analysis = await self.analyze_document(document)
return { 'summary': analysis['summary'], 'key_points': analysis['key_points'], 'sentiment': analysis['sentiment'] }๐ฌ Deep Dive: Multi-Agent Workflows
Section titled โ๐ฌ Deep Dive: Multi-Agent WorkflowsโCoordinated Multi-Agent System
Section titled โCoordinated Multi-Agent Systemโclass MultiAgentCoordinator: """ Coordinates multiple agents for complex tasks """ def __init__(self, registry: A2ARegistry): self.registry = registry self.client = A2AClient("coordinator", registry)
async def execute_workflow(self, workflow_spec: Dict) -> Dict: """ Execute multi-agent workflow """ results = {}
for step in workflow_spec['steps']: step_id = step['id'] capability = step['capability'] depends_on = step.get('depends_on', [])
# Wait for dependencies context = {} for dep_id in depends_on: if dep_id in results: context[dep_id] = results[dep_id]
# Execute step result = await self.client.delegate_task( capability=capability, task_description=step['description'], context=context )
results[step_id] = result
return results
# Example workflowworkflow = { 'steps': [ { 'id': 'research', 'capability': 'web_search', 'description': 'Research AI agent architectures', 'depends_on': [] }, { 'id': 'analyze', 'capability': 'document_analysis', 'description': 'Analyze research findings', 'depends_on': ['research'] }, { 'id': 'write', 'capability': 'content_generation', 'description': 'Write summary article', 'depends_on': ['research', 'analyze'] } ]}
coordinator = MultiAgentCoordinator(registry)results = await coordinator.execute_workflow(workflow)โก Quick Win: Simple A2A System
Section titled โโก Quick Win: Simple A2A Systemโ20-Minute Implementation
Section titled โ20-Minute Implementationโfrom fastapi import FastAPIimport httpx
class SimpleA2AAgent: """ Minimal A2A-compatible agent """ def __init__(self, name: str, skill: str): self.name = name self.skill = skill self.app = FastAPI()
@self.app.post("/task") async def handle(task: dict): if task['type'] == self.skill: result = await self.execute(task['data']) return {'status': 'success', 'result': result} return {'status': 'error', 'error': 'Unsupported task'}
async def execute(self, data): # Implement skill return f"{self.name} processed: {data}"
async def delegate(self, agent_url: str, task_type: str, data: str): async with httpx.AsyncClient() as client: response = await client.post( f"{agent_url}/task", json={'type': task_type, 'data': data} ) return response.json()
# Create agentsresearcher = SimpleA2AAgent("Researcher", "research")writer = SimpleA2AAgent("Writer", "write")
# Researcher delegates to writerresult = await researcher.delegate( "http://localhost:8001", "write", "Research findings about RAG systems")๐ณ Advanced: Intelligent Agent Selection
Section titled โ๐ณ Advanced: Intelligent Agent SelectionโCapability Matching and Load Balancing
Section titled โCapability Matching and Load Balancingโclass IntelligentA2ARouter: """ Smart routing for A2A requests """ def __init__(self, registry: A2ARegistry): self.registry = registry self.agent_metrics = {}
async def select_agent(self, capability: str, requirements: Dict = None) -> str: """ Select best agent for task """ # Find capable agents agents = self.registry.find_agents(capability)
if not agents: return None
# Score each agent scored_agents = [] for agent in agents: score = await self.score_agent(agent, requirements or {}) scored_agents.append((score, agent))
# Select highest scoring best_agent = max(scored_agents, key=lambda x: x[0])[1]
return best_agent['id']
async def score_agent(self, agent: Dict, requirements: Dict) -> float: """ Score agent based on multiple factors """ score = 0.0
# Factor 1: Current load (prefer less busy agents) load = await self.get_agent_load(agent['id']) score += (1.0 - load) * 0.4
# Factor 2: Performance history performance = self.agent_metrics.get(agent['id'], {}).get('success_rate', 0.5) score += performance * 0.3
# Factor 3: Response time avg_response = self.agent_metrics.get(agent['id'], {}).get('avg_response_ms', 1000) score += (1.0 - min(avg_response / 2000, 1.0)) * 0.2
# Factor 4: Specialization match if 'preferred_tasks' in requirements: metadata = agent.get('metadata', {}) if requirements['preferred_tasks'] in metadata.get('specialties', []): score += 0.1
return score
async def get_agent_load(self, agent_id: str) -> float: """ Get current agent load (0.0 = idle, 1.0 = full) """ # Query agent for current load agent = self.registry.get_agent(agent_id) async with httpx.AsyncClient() as client: try: response = await client.get(f"{agent['endpoint']}/metrics") metrics = response.json() return metrics.get('load', 0.5) except: return 0.5 # Default if unavailable๐ฏ Real-World A2A Patterns
Section titled โ๐ฏ Real-World A2A PatternsโPattern 1: Pipeline Pattern
Section titled โPattern 1: Pipeline Patternโclass PipelinePattern: """ Chain agents in a pipeline """ async def execute_pipeline(self, agents: List[str], initial_data: Dict) -> Dict: """ Pass data through agent pipeline """ data = initial_data
for agent_id in agents: response = await self.client.send_request(agent_id, { 'type': 'process', 'data': data })
if response['status'] != 'success': return response
# Output of one agent becomes input to next data = response['result']
return {'status': 'success', 'result': data}
# Usagepipeline = PipelinePattern()result = await pipeline.execute_pipeline( agents=['data_collector', 'data_processor', 'report_generator'], initial_data={'query': 'Q4 sales'})Pattern 2: Competitive Pattern
Section titled โPattern 2: Competitive Patternโclass CompetitivePattern: """ Multiple agents compete, best result wins """ async def compete(self, agents: List[str], task: Dict) -> Dict: """ Send task to multiple agents, return best result """ # Send to all agents in parallel responses = await asyncio.gather(*[ self.client.send_request(agent_id, task) for agent_id in agents ])
# Score responses scored = [] for response in responses: if response['status'] == 'success': score = await self.score_response(response['result']) scored.append((score, response))
# Return best if scored: best = max(scored, key=lambda x: x[0])[1] return best
return {'status': 'error', 'error': 'All agents failed'}Pattern 3: Consensus Pattern
Section titled โPattern 3: Consensus Patternโclass ConsensusPattern: """ Multiple agents vote on result """ async def reach_consensus(self, agents: List[str], task: Dict, threshold: float = 0.6) -> Dict: """ Agents vote, require threshold agreement """ # Get responses from all agents responses = await asyncio.gather(*[ self.client.send_request(agent_id, task) for agent_id in agents ])
# Collect results results = [r['result'] for r in responses if r['status'] == 'success']
# Find consensus consensus = self.find_most_common(results) agreement = results.count(consensus) / len(results)
if agreement >= threshold: return { 'status': 'success', 'result': consensus, 'consensus_level': agreement }
return { 'status': 'no_consensus', 'results': results, 'agreement': agreement }๐ฏ Key Takeaways
Section titled โ๐ฏ Key Takeawaysโ- A2A enables agent collaboration through standardized protocol
- Discovery allows agents to find suitable collaborators
- Delegation distributes work across specialist agents
- Coordination enables complex multi-agent workflows
- Patterns like pipeline, competitive, and consensus solve common problems
Best Practices
Section titled โBest Practicesโ1. Clear Capability Definitions
Section titled โ1. Clear Capability Definitionsโ# โ
Good: Specific capabilitiescapabilities = ["web_search", "pdf_parsing", "entity_extraction"]
# โ Bad: Vague capabilitiescapabilities = ["research", "analysis"]2. Structured Task Descriptions
Section titled โ2. Structured Task Descriptionsโ# โ
Good: Well-defined task{ 'type': 'web_search', 'description': 'Find research papers on GraphRAG from 2024', 'context': { 'max_results': 10, 'date_range': '2024-01-01,2024-12-31', 'sources': ['arxiv', 'acm'] }}
# โ Bad: Ambiguous task{ 'type': 'search', 'data': 'GraphRAG papers'}3. Error Handling and Fallbacks
Section titled โ3. Error Handling and Fallbacksโ# โ
Good: Handle failures gracefullyasync def delegate_with_fallback(self, task): try: return await self.primary_agent.execute(task) except Exception: return await self.fallback_agent.execute(task)
# โ Bad: No error handlingasync def delegate(self, task): return await self.agent.execute(task)Next Steps
Section titled โNext StepsโContinue to Tool Ecosystems to learn about integrating with existing frameworks like LangChain and CrewAI.
๐ก Remember: A2A transforms isolated agents into a collaborative team where specialized agents work together to solve complex problems.