๐ค Multi-Agent Systems: Orchestration Patterns
๐ณ Forest-Level Concept
Section titled โ๐ณ Forest-Level ConceptโLabel: Coordinating Multiple Agents for Complex Tasks
Multi-agent systems combine specialized agents to tackle problems too complex for any single agent. Understanding orchestration patterns is key to building scalable AI systems.
Multi-Agent Architecture Overview
Section titled โMulti-Agent Architecture Overviewโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Multi-Agent System โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโคโ โโ Orchestrator โ Coordinates all agents โโ โ โโ โโโ Specialist Agent A (Domain Expert) โโ โโโ Specialist Agent B (Tool Expert) โโ โโโ Specialist Agent C (Analysis Expert) โโ โโโ Validator Agent (Quality Control) โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ๐ฏ When to Use Multi-Agent Systems
Section titled โ๐ฏ When to Use Multi-Agent SystemsโIdeal Scenarios
Section titled โIdeal Scenariosโ- Complex problem decomposition - Break large tasks into specialized subtasks
- Diverse expertise requirements - Different domains need different specialists
- Quality assurance - Separate agents for creation and validation
- Scalable workflows - Add agents as complexity grows
Consider Alternatives When
Section titled โConsider Alternatives Whenโ- Simple, single-domain tasks
- Latency is critical (each agent adds overhead)
- Budget constraints (multiple LLM calls)
๐ฑ Seedling: Basic Multi-Agent Pattern
Section titled โ๐ฑ Seedling: Basic Multi-Agent PatternโThe Simplest Multi-Agent Setup
Section titled โThe Simplest Multi-Agent Setupโfrom typing import Dict, Any, Listfrom dataclasses import dataclass
@dataclassclass AgentMessage: """Message passed between agents""" sender: str content: str metadata: Dict[str, Any] = None
class SimpleAgent: """Base agent class""" def __init__(self, name: str, role: str): self.name = name self.role = role
async def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """Process input and return output""" raise NotImplementedError
class MultiAgentOrchestrator: """Basic orchestrator for multiple agents"""
def __init__(self): self.agents: Dict[str, SimpleAgent] = {} self.message_history: List[AgentMessage] = []
def register_agent(self, agent: SimpleAgent): """Add agent to the system""" self.agents[agent.name] = agent
async def route_task(self, task: str) -> str: """Route task to appropriate agent""" # Simple routing based on keywords for name, agent in self.agents.items(): if agent.role.lower() in task.lower(): return name return list(self.agents.keys())[0] # Default to first agent
async def execute(self, task: str) -> Dict[str, Any]: """Execute task with appropriate agent""" agent_name = await self.route_task(task) agent = self.agents[agent_name]
result = await agent.process({"task": task})
self.message_history.append(AgentMessage( sender=agent_name, content=str(result) ))
return result๐ฟ Growing: Specialized Agent Patterns
Section titled โ๐ฟ Growing: Specialized Agent PatternsโResearch + Analysis + Report Pattern
Section titled โResearch + Analysis + Report Patternโclass ResearchAgent(SimpleAgent): """Gathers and synthesizes information"""
def __init__(self): super().__init__("Researcher", "research") self.tools = ["web_search", "document_reader", "database_query"]
async def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: topic = input_data.get("task", "")
# Simulate research findings = { "sources": ["Source A", "Source B", "Source C"], "key_points": [ "Finding 1: Important discovery", "Finding 2: Supporting evidence", "Finding 3: Counterpoints to consider" ], "confidence": 0.85 }
return { "research_results": findings, "next_step": "analysis" }
class AnalysisAgent(SimpleAgent): """Analyzes research findings"""
def __init__(self): super().__init__("Analyst", "analysis")
async def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: research = input_data.get("research_results", {})
analysis = { "patterns": ["Pattern A", "Pattern B"], "insights": [ "Insight 1: Correlation discovered", "Insight 2: Trend identified" ], "recommendations": [ "Recommendation 1: Action to take", "Recommendation 2: Further investigation" ] }
return { "research_results": research, "analysis_results": analysis, "next_step": "report" }
class ReportAgent(SimpleAgent): """Creates final report from analysis"""
def __init__(self): super().__init__("Reporter", "report")
async def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: research = input_data.get("research_results", {}) analysis = input_data.get("analysis_results", {})
report = f"""# Research Report
## Sources Consulted{chr(10).join('- ' + s for s in research.get('sources', []))}
## Key Findings{chr(10).join('- ' + p for p in research.get('key_points', []))}
## Analysis### Patterns Identified{chr(10).join('- ' + p for p in analysis.get('patterns', []))}
### Insights{chr(10).join('- ' + i for i in analysis.get('insights', []))}
## Recommendations{chr(10).join('- ' + r for r in analysis.get('recommendations', []))} """
return { "final_report": report, "status": "complete" }๐ณ Forest: Advanced Orchestration
Section titled โ๐ณ Forest: Advanced OrchestrationโHierarchical Multi-Agent System
Section titled โHierarchical Multi-Agent Systemโfrom enum import Enumfrom typing import Optional
class AgentRole(Enum): ORCHESTRATOR = "orchestrator" SPECIALIST = "specialist" VALIDATOR = "validator" COORDINATOR = "coordinator"
class HierarchicalOrchestrator: """ Multi-level agent coordination with: - Top-level orchestrator - Team coordinators - Specialist agents - Cross-team validators """
def __init__(self): self.teams: Dict[str, Dict[str, SimpleAgent]] = {} self.coordinators: Dict[str, SimpleAgent] = {} self.validators: List[SimpleAgent] = []
def create_team(self, team_name: str, coordinator: SimpleAgent): """Create a new team with coordinator""" self.teams[team_name] = {} self.coordinators[team_name] = coordinator
def add_specialist(self, team_name: str, agent: SimpleAgent): """Add specialist to team""" if team_name not in self.teams: raise ValueError(f"Team {team_name} does not exist") self.teams[team_name][agent.name] = agent
def add_validator(self, agent: SimpleAgent): """Add cross-team validator""" self.validators.append(agent)
async def execute_complex_task( self, task: str, require_validation: bool = True ) -> Dict[str, Any]: """ Execute task across multiple teams """ results = {}
# Phase 1: Parallel team execution for team_name, team_agents in self.teams.items(): coordinator = self.coordinators[team_name]
# Coordinator delegates to specialists team_result = await self._execute_team_task( coordinator, team_agents, task ) results[team_name] = team_result
# Phase 2: Cross-team validation if require_validation and self.validators: for validator in self.validators: validation = await validator.process({ "results": results, "task": "validate" }) results["validation"] = validation
# Phase 3: Synthesis final_result = self._synthesize_results(results)
return final_result
async def _execute_team_task( self, coordinator: SimpleAgent, specialists: Dict[str, SimpleAgent], task: str ) -> Dict[str, Any]: """Execute task within a single team""" # Coordinator creates subtasks subtasks = await coordinator.process({ "task": task, "action": "decompose" })
# Specialists execute subtasks specialist_results = {} for subtask in subtasks.get("subtasks", []): agent_name = subtask.get("assigned_to") if agent_name in specialists: result = await specialists[agent_name].process(subtask) specialist_results[agent_name] = result
# Coordinator synthesizes results synthesis = await coordinator.process({ "action": "synthesize", "results": specialist_results })
return synthesis
def _synthesize_results( self, results: Dict[str, Any] ) -> Dict[str, Any]: """Combine results from all teams""" return { "status": "complete", "team_results": results, "summary": "Multi-agent task completed successfully" }๐ก Key Insights
Section titled โ๐ก Key InsightsโAgent Communication Patterns
Section titled โAgent Communication Patternsโ- Direct messaging - Agents communicate point-to-point
- Broadcast - One agent sends to all others
- Publish-subscribe - Agents subscribe to relevant topics
- Shared memory - Agents read/write common state
Conflict Resolution
Section titled โConflict ResolutionโWhen agents disagree:
- Voting - Majority wins
- Weighted consensus - Based on confidence scores
- Escalation - Human or supervisor agent decides
- Debate - Agents argue until consensus
โก Quick Win: Start Simple
Section titled โโก Quick Win: Start SimpleโBegin with a two-agent system:
# Minimal multi-agent setupcreator = CreatorAgent()critic = CriticAgent()
# Create โ Critique โ Refine loopdraft = await creator.create(task)feedback = await critic.review(draft)final = await creator.refine(draft, feedback)๐ฌ Deep Dive: Production Considerations
Section titled โ๐ฌ Deep Dive: Production ConsiderationsโMonitoring Multi-Agent Systems
Section titled โMonitoring Multi-Agent Systemsโclass AgentMonitor: """Track multi-agent system health"""
def __init__(self): self.metrics = { "agent_calls": {}, "latencies": {}, "errors": {}, "token_usage": {} }
def log_call( self, agent_name: str, latency_ms: float, tokens: int ): """Log an agent call""" if agent_name not in self.metrics["agent_calls"]: self.metrics["agent_calls"][agent_name] = 0 self.metrics["latencies"][agent_name] = [] self.metrics["token_usage"][agent_name] = 0
self.metrics["agent_calls"][agent_name] += 1 self.metrics["latencies"][agent_name].append(latency_ms) self.metrics["token_usage"][agent_name] += tokens
def get_summary(self) -> Dict[str, Any]: """Get monitoring summary""" return { "total_calls": sum(self.metrics["agent_calls"].values()), "avg_latency": self._avg_latency(), "total_tokens": sum(self.metrics["token_usage"].values()), "by_agent": self.metrics }๐ Related Resources
Section titled โ๐ Related Resourcesโ- Agent Types - Foundation for multi-agent design
- Evaluation & Testing - Testing multi-agent systems
- Production Deployment - Scaling multi-agent systems
- Tool Ecosystems - Integration patterns
Built with โค๏ธ by HUB for learners who want to master AI ร Human collaboration