Multi-Agent Systems: Orchestration and Communication
Multi-agent systems distribute complex tasks across specialized LLM agents. Production deployment requires robust orchestration, message passing, and consensus mechanisms to ensure reliability.
Multi-Agent Architecture
Orchestration Patterns
1. Orchestrator-Worker Pattern
import asyncio
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from enum import Enum
class AgentRole(Enum):
ORCHESTRATOR = "orchestrator"
WORKER = "worker"
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
task_id: str
agent_id: str
description: str
input_data: Dict[str, Any]
status: TaskStatus = TaskStatus.PENDING
result: Optional[Any] = None
dependencies: List[str] = field(default_factory=list)
@dataclass
class AgentMessage:
sender: str
receiver: str
content: Any
message_type: str
timestamp: float
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
class LLMAgent:
def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.inbox: List[AgentMessage] = []
self.tasks: List[Task] = []
def receive_message(self, message: AgentMessage):
self.inbox.append(message)
def create_task(self, description: str, input_data: Dict,
dependencies: Optional[List[str]] = None) -> Task:
task = Task(
task_id=str(uuid.uuid4()),
agent_id=self.agent_id,
description=description,
input_data=input_data,
dependencies=dependencies or []
)
self.tasks.append(task)
return task
class OrchestratorAgent:
def __init__(self):
self.workers: Dict[str, LLMAgent] = {}
self.task_graph: Dict[str, Task] = {}
self.message_bus: List[AgentMessage] = []
def register_worker(self, worker: LLMAgent):
self.workers[worker.agent_id] = worker
def decompose_task(self, task: str) -> List[Dict]:
subtasks = [
{"description": f"Step 1: Analyze {task}", "agent": "researcher"},
{"description": f"Step 2: Implement for {task}", "agent": "coder"},
{"description": f"Step 3: Review {task}", "agent": "reviewer"},
]
return subtasks
def assign_tasks(self, subtasks: List[Dict]) -> List[Task]:
assigned = []
prev_task_id = None
for subtask in subtasks:
worker_id = subtask.get("agent", "")
worker = self.workers.get(worker_id)
if worker:
deps = [prev_task_id] if prev_task_id else []
task = worker.create_task(subtask["description"], {}, deps)
self.task_graph[task.task_id] = task
assigned.append(task)
prev_task_id = task.task_id
return assigned
def get_execution_order(self) -> List[str]:
in_degree = {tid: 0 for tid in self.task_graph}
for task in self.task_graph.values():
for dep in task.dependencies:
in_degree[task.task_id] = in_degree.get(task.task_id, 0) + 1
queue = [tid for tid, deg in in_degree.items() if deg == 0]
order = []
while queue:
tid = queue.pop(0)
order.append(tid)
for task in self.task_graph.values():
if tid in task.dependencies:
in_degree[task.task_id] -= 1
if in_degree[task.task_id] == 0:
queue.append(task.task_id)
return order
2. Consensus Mechanism
from typing import List, Dict
from collections import Counter
class ConsensusEngine:
def __init__(self, agents: List[str], quorum: int = None):
self.agents = agents
self.quorum = quorum or (len(agents) // 2 + 1)
self.votes: Dict[str, List[Dict]] = {}
def collect_votes(self, proposal_id: str, votes: List[Dict]):
self.votes[proposal_id] = votes
def check_consensus(self, proposal_id: str) -> Dict:
votes = self.votes.get(proposal_id, [])
if len(votes) < self.quorum:
return {"consensus": False, "reason": "insufficient_votes"}
positions = [v.get("position", "abstain") for v in votes]
counter = Counter(positions)
majority_pos, majority_count = counter.most_common(1)[0]
if majority_count >= self.quorum:
return {"consensus": True, "position": majority_pos, "votes": majority_count}
return {"consensus": False, "reason": "no_majority"}
def weighted_vote(self, proposal_id: str, weights: Dict[str, float]) -> Dict:
votes = self.votes.get(proposal_id, [])
weighted_scores: Dict[str, float] = {}
for vote in votes:
agent = vote.get("agent", "")
position = vote.get("position", "abstain")
weight = weights.get(agent, 1.0)
weighted_scores[position] = weighted_scores.get(position, 0) + weight
winner = max(weighted_scores, key=weighted_scores.get)
return {"decision": winner, "scores": weighted_scores}
Key Formulas
Task Distribution Cost
Here,
- =Compute cost for agent i
- =Communication cost for message j
- =Number of agents
- =Number of inter-agent messages
Orchestration Patterns Comparison
| Pattern | Latency | Fault Tolerance | Complexity | Best For |
|---|---|---|---|---|
| Sequential | High | Low | Low | Simple chains |
| Parallel | Low | Medium | Medium | Independent tasks |
| Orchestrator | Medium | High | High | Complex workflows |
| Peer-to-Peer | Low | High | Very High | Collaborative tasks |
Best Practices
- Design clear agent boundaries with well-defined responsibilities
- Use async messaging for inter-agent communication
- Implement circuit breakers between agents to prevent cascade failures
- Set cost budgets for multi-agent conversations
- Log all inter-agent messages for debugging