πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Multi-Agent Systems: Orchestration and Communication

Advanced LLMOpsMulti-Agent Systems🟒 Free Lesson

Advertisement

Multi-Agent Systems: Orchestration and Communication

Multi-agent systems distribute complex tasks across specialized LLM agents. Production deployment requires robust orchestration, message passing, and consensus mechanisms to ensure reliability.

Multi-Agent Architecture

Orchestration Patterns

1. Orchestrator-Worker Pattern

import asyncio
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from enum import Enum

class AgentRole(Enum):
    ORCHESTRATOR = "orchestrator"
    WORKER = "worker"

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    task_id: str
    agent_id: str
    description: str
    input_data: Dict[str, Any]
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[Any] = None
    dependencies: List[str] = field(default_factory=list)

@dataclass
class AgentMessage:
    sender: str
    receiver: str
    content: Any
    message_type: str
    timestamp: float
    correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))

class LLMAgent:
    def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]):
        self.agent_id = agent_id
        self.role = role
        self.capabilities = capabilities
        self.inbox: List[AgentMessage] = []
        self.tasks: List[Task] = []

    def receive_message(self, message: AgentMessage):
        self.inbox.append(message)

    def create_task(self, description: str, input_data: Dict,
                    dependencies: Optional[List[str]] = None) -> Task:
        task = Task(
            task_id=str(uuid.uuid4()),
            agent_id=self.agent_id,
            description=description,
            input_data=input_data,
            dependencies=dependencies or []
        )
        self.tasks.append(task)
        return task

class OrchestratorAgent:
    def __init__(self):
        self.workers: Dict[str, LLMAgent] = {}
        self.task_graph: Dict[str, Task] = {}
        self.message_bus: List[AgentMessage] = []

    def register_worker(self, worker: LLMAgent):
        self.workers[worker.agent_id] = worker

    def decompose_task(self, task: str) -> List[Dict]:
        subtasks = [
            {"description": f"Step 1: Analyze {task}", "agent": "researcher"},
            {"description": f"Step 2: Implement for {task}", "agent": "coder"},
            {"description": f"Step 3: Review {task}", "agent": "reviewer"},
        ]
        return subtasks

    def assign_tasks(self, subtasks: List[Dict]) -> List[Task]:
        assigned = []
        prev_task_id = None
        for subtask in subtasks:
            worker_id = subtask.get("agent", "")
            worker = self.workers.get(worker_id)
            if worker:
                deps = [prev_task_id] if prev_task_id else []
                task = worker.create_task(subtask["description"], {}, deps)
                self.task_graph[task.task_id] = task
                assigned.append(task)
                prev_task_id = task.task_id
        return assigned

    def get_execution_order(self) -> List[str]:
        in_degree = {tid: 0 for tid in self.task_graph}
        for task in self.task_graph.values():
            for dep in task.dependencies:
                in_degree[task.task_id] = in_degree.get(task.task_id, 0) + 1
        queue = [tid for tid, deg in in_degree.items() if deg == 0]
        order = []
        while queue:
            tid = queue.pop(0)
            order.append(tid)
            for task in self.task_graph.values():
                if tid in task.dependencies:
                    in_degree[task.task_id] -= 1
                    if in_degree[task.task_id] == 0:
                        queue.append(task.task_id)
        return order

2. Consensus Mechanism

from typing import List, Dict
from collections import Counter

class ConsensusEngine:
    def __init__(self, agents: List[str], quorum: int = None):
        self.agents = agents
        self.quorum = quorum or (len(agents) // 2 + 1)
        self.votes: Dict[str, List[Dict]] = {}

    def collect_votes(self, proposal_id: str, votes: List[Dict]):
        self.votes[proposal_id] = votes

    def check_consensus(self, proposal_id: str) -> Dict:
        votes = self.votes.get(proposal_id, [])
        if len(votes) < self.quorum:
            return {"consensus": False, "reason": "insufficient_votes"}
        positions = [v.get("position", "abstain") for v in votes]
        counter = Counter(positions)
        majority_pos, majority_count = counter.most_common(1)[0]
        if majority_count >= self.quorum:
            return {"consensus": True, "position": majority_pos, "votes": majority_count}
        return {"consensus": False, "reason": "no_majority"}

    def weighted_vote(self, proposal_id: str, weights: Dict[str, float]) -> Dict:
        votes = self.votes.get(proposal_id, [])
        weighted_scores: Dict[str, float] = {}
        for vote in votes:
            agent = vote.get("agent", "")
            position = vote.get("position", "abstain")
            weight = weights.get(agent, 1.0)
            weighted_scores[position] = weighted_scores.get(position, 0) + weight
        winner = max(weighted_scores, key=weighted_scores.get)
        return {"decision": winner, "scores": weighted_scores}

Key Formulas

Task Distribution Cost

Ctotal=βˆ‘i=1NCagenti+βˆ‘j=1MCcommjC_{total} = \sum_{i=1}^{N} C_{agent_i} + \sum_{j=1}^{M} C_{comm_j}

Here,

  • CagentiC_{agent_i}=Compute cost for agent i
  • CcommjC_{comm_j}=Communication cost for message j
  • NN=Number of agents
  • MM=Number of inter-agent messages

Orchestration Patterns Comparison

PatternLatencyFault ToleranceComplexityBest For
SequentialHighLowLowSimple chains
ParallelLowMediumMediumIndependent tasks
OrchestratorMediumHighHighComplex workflows
Peer-to-PeerLowHighVery HighCollaborative tasks

Best Practices

  1. Design clear agent boundaries with well-defined responsibilities
  2. Use async messaging for inter-agent communication
  3. Implement circuit breakers between agents to prevent cascade failures
  4. Set cost budgets for multi-agent conversations
  5. Log all inter-agent messages for debugging
⭐

Premium Content

Multi-Agent Systems: Orchestration and Communication

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert AI Ops & LLM Ops Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement