Skip to main content
AI & LLMsDocumented

multi-agent-orchestration

Multi-agent AI system design. Orchestrator-worker patterns, LangGraph state machines, parallel execution, task decomposition, agent communication, shared memory, error recovery, and A2A protocol patterns.

Share:

Installation

npx clawhub@latest install multi-agent-orchestration

View the full skill documentation and source below.

Documentation

Multi-Agent Orchestration

When to Use Multi-Agent Systems

Use multiple agents when:

  • Tasks are too long for a single context window

  • Parallel specialized work increases quality or speed

  • Different tasks need different models/tools

  • Independent verification improves reliability


Single agent is usually better for simple, linear tasks.


Core Patterns

1. Orchestrator-Worker Pattern

Human → [Orchestrator Agent]
              ↓ decomposes task
    ┌──────────┴──────────┐
[Worker 1]  [Worker 2]  [Worker 3]
(Research)  (Analysis)  (Writing)
    └──────────┬──────────┘
              ↓ synthesize
[Orchestrator] → response to human

2. Pipeline (Sequential)

Input → [Agent 1] → [Agent 2] → [Agent 3] → Output
         (Extract)  (Transform)  (Format)

3. Debate/Multi-perspective

Question → [Agent A] → Opinion A ─┐
         → [Agent B] → Opinion B ─┤→ [Judge] → Decision
         → [Agent C] → Opinion C ─┘

4. Supervisor with Subagents

[Supervisor] ←→ planning loop
     ↓ routes to specialist
[Code Agent] [Research Agent] [QA Agent]
     ↑ reports results back
[Supervisor] → final synthesis

LangGraph Implementation

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator

# Define shared state
class ResearchState(TypedDict):
    task: str
    research: str
    outline: str
    draft: str
    review_feedback: str
    final_output: str
    iteration: int
    messages: Annotated[list, operator.add]

# Define agents as nodes
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def research_agent(state: ResearchState) -> ResearchState:
    """Gather information on the topic."""
    response = llm.invoke([
        HumanMessage(content=f"""Research this topic thoroughly: {state['task']}
        
Provide key facts, statistics, and relevant context.
Format as bullet points.""")
    ])
    return {"research": response.content, "messages": [response]}

def outline_agent(state: ResearchState) -> ResearchState:
    """Create a structured outline."""
    response = llm.invoke([
        HumanMessage(content=f"""Task: {state['task']}
Research: {state['research']}

Create a detailed outline with sections and key points.""")
    ])
    return {"outline": response.content}

def writer_agent(state: ResearchState) -> ResearchState:
    """Write the draft."""
    response = llm.invoke([
        HumanMessage(content=f"""Task: {state['task']}
Outline: {state['outline']}
Research: {state['research']}

Write a comprehensive, well-structured response.""")
    ])
    return {"draft": response.content}

def reviewer_agent(state: ResearchState) -> ResearchState:
    """Review and provide feedback."""
    response = llm.invoke([
        HumanMessage(content=f"""Review this draft for the task: {state['task']}
        
Draft: {state['draft']}

Provide specific feedback on:
1. Accuracy and completeness
2. Clarity and structure
3. Any missing key points

If the draft is satisfactory, respond with: APPROVED
Otherwise, list specific improvements needed.""")
    ])
    return {
        "review_feedback": response.content,
        "iteration": state.get("iteration", 0) + 1
    }

def revise_agent(state: ResearchState) -> ResearchState:
    """Revise based on feedback."""
    response = llm.invoke([
        HumanMessage(content=f"""Revise this draft based on feedback:

Original draft: {state['draft']}
Feedback: {state['review_feedback']}

Provide the improved version.""")
    ])
    return {"draft": response.content}

def should_revise(state: ResearchState) -> str:
    """Conditional edge: revise or accept."""
    if state.get("iteration", 0) >= 3:  # Max 3 iterations
        return "finalize"
    if "APPROVED" in state.get("review_feedback", ""):
        return "finalize"
    return "revise"

def finalize(state: ResearchState) -> ResearchState:
    return {"final_output": state["draft"]}

# Build the graph
workflow = StateGraph(ResearchState)

# Add nodes
workflow.add_node("research", research_agent)
workflow.add_node("outline", outline_agent)
workflow.add_node("write", writer_agent)
workflow.add_node("review", reviewer_agent)
workflow.add_node("revise", revise_agent)
workflow.add_node("finalize", finalize)

# Add edges
workflow.set_entry_point("research")
workflow.add_edge("research", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")

# Conditional edge
workflow.add_conditional_edges(
    "review",
    should_revise,
    {
        "revise": "revise",
        "finalize": "finalize",
    }
)
workflow.add_edge("revise", "review")
workflow.add_edge("finalize", END)

# Compile
app = workflow.compile()

# Run
result = app.invoke({
    "task": "Explain the CAP theorem and its practical implications",
    "iteration": 0
})
print(result["final_output"])

Parallel Execution Pattern

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_agent(role: str, task: str, context: str) -> str:
    """Run a single agent asynchronously."""
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"You are a {role} expert."},
            {"role": "user", "content": f"Context: {context}\n\nTask: {task}"}
        ],
        temperature=0,
    )
    return response.choices[0].message.content

async def parallel_research(topic: str) -> dict:
    """Run multiple specialist agents in parallel."""
    
    agents = {
        "technical": "technical architect",
        "business": "business analyst", 
        "security": "security engineer",
        "user_experience": "UX designer",
    }
    
    task_template = f"Analyze this topic from your perspective: {topic}"
    
    # Fan out — all run simultaneously
    tasks = {
        name: asyncio.create_task(run_agent(role, task_template, ""))
        for name, role in agents.items()
    }
    
    # Gather results (with timeout)
    results = {}
    for name, task in tasks.items():
        try:
            results[name] = await asyncio.wait_for(task, timeout=30)
        except asyncio.TimeoutError:
            results[name] = f"Agent timed out"
    
    # Synthesize
    synthesis_prompt = f"""Synthesize these expert perspectives on: {topic}

Technical: {results['technical']}
Business: {results['business']}
Security: {results['security']}
UX: {results['user_experience']}

Provide a balanced, comprehensive recommendation."""
    
    synthesis = await run_agent("chief architect", synthesis_prompt, "")
    return {"perspectives": results, "synthesis": synthesis}

Agent Communication Protocols

Message Format Standard

from dataclasses import dataclass
from datetime import datetime
from typing import Any
import uuid

@dataclass
class AgentMessage:
    """Standard message format for agent communication."""
    id: str = None
    from_agent: str = None
    to_agent: str = None
    task_id: str = None
    type: str = "request"  # request, response, error, status
    content: Any = None
    metadata: dict = None
    timestamp: str = None
    
    def __post_init__(self):
        self.id = self.id or str(uuid.uuid4())
        self.timestamp = self.timestamp or datetime.utcnow().isoformat()
        self.metadata = self.metadata or {}

# Message types
class AgentRequest(AgentMessage):
    """Request from orchestrator to worker."""
    type: str = "request"
    
class AgentResult(AgentMessage):
    """Result from worker to orchestrator."""
    type: str = "response"
    success: bool = True
    error: str = None

class AgentStatus(AgentMessage):
    """Progress update during long-running task."""
    type: str = "status"
    progress: float = 0.0  # 0.0 - 1.0
    current_step: str = ""

Shared Memory and Context

from typing import Optional
import json
import redis

class AgentMemory:
    """Shared memory for multi-agent collaboration."""
    
    def __init__(self, redis_client: redis.Redis, task_id: str):
        self.redis = redis_client
        self.task_id = task_id
        self.prefix = f"task:{task_id}:memory"
    
    def store(self, key: str, value: Any, ttl: int = 3600):
        """Store data accessible to all agents in this task."""
        self.redis.setex(
            f"{self.prefix}:{key}",
            ttl,
            json.dumps(value)
        )
    
    def retrieve(self, key: str) -> Optional[Any]:
        """Retrieve shared data."""
        data = self.redis.get(f"{self.prefix}:{key}")
        return json.loads(data) if data else None
    
    def append_to_log(self, agent_name: str, entry: str):
        """Append to shared task log."""
        self.redis.rpush(
            f"{self.prefix}:log",
            json.dumps({
                "agent": agent_name,
                "timestamp": datetime.utcnow().isoformat(),
                "entry": entry,
            })
        )
    
    def get_log(self) -> list:
        """Get complete task execution log."""
        entries = self.redis.lrange(f"{self.prefix}:log", 0, -1)
        return [json.loads(e) for e in entries]

Task Decomposition Patterns

DECOMPOSER_PROMPT = """Break this complex task into 3-7 independent subtasks.
Each subtask should:
- Be executable by a single specialized agent
- Be as independent as possible from other subtasks
- Have a clear input and output
- Be assigned to the most appropriate agent type

Available agent types: researcher, analyst, coder, writer, reviewer, qa_tester

Task: {task}

Return as JSON array:
[
  {{
    "id": "subtask_1",
    "title": "Brief title",
    "description": "What needs to be done",
    "agent_type": "researcher",
    "depends_on": [],  // IDs of subtasks that must complete first
    "estimated_complexity": "low|medium|high"
  }}
]"""

async def decompose_task(task: str) -> list[dict]:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": DECOMPOSER_PROMPT.format(task=task)}],
        response_format={"type": "json_object"},
        temperature=0,
    )
    return json.loads(response.choices[0].message.content)

async def execute_decomposed(task: str) -> dict:
    """Execute decomposed task with dependency resolution."""
    subtasks = await decompose_task(task)
    
    # Build dependency graph
    completed = {}
    pending = {t["id"]: t for t in subtasks}
    
    while pending:
        # Find subtasks with satisfied dependencies
        ready = [
            t for t in pending.values()
            if all(dep in completed for dep in t["depends_on"])
        ]
        
        if not ready:
            raise RuntimeError("Circular dependency detected")
        
        # Execute ready subtasks in parallel
        results = await asyncio.gather(*[
            execute_subtask(t, {dep: completed[dep] for dep in t["depends_on"]})
            for t in ready
        ])
        
        for subtask, result in zip(ready, results):
            completed[subtask["id"]] = result
            del pending[subtask["id"]]
    
    return completed

Error Handling and Resilience

import asyncio
from functools import wraps

def retry_agent(max_attempts: int = 3, delay: float = 1.0):
    """Decorator for automatic agent retry with exponential backoff."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    wait_time = delay * (2 ** attempt)
                    print(f"Agent {func.__name__} failed (attempt {attempt+1}): {e}")
                    if attempt < max_attempts - 1:
                        await asyncio.sleep(wait_time)
            raise RuntimeError(f"Agent failed after {max_attempts} attempts: {last_error}")
        return wrapper
    return decorator

class AgentOrchestrator:
    """Fault-tolerant orchestrator with fallbacks."""
    
    def __init__(self, agents: dict):
        self.agents = agents
        self.fallback_model = "gpt-4o-mini"  # Cheaper fallback
    
    async def run_with_fallback(
        self,
        primary_agent: str,
        fallback_agent: str,
        task: str,
        context: dict = None
    ) -> str:
        try:
            return await self.agents[primary_agent](task, context)
        except Exception as e:
            print(f"Primary agent {primary_agent} failed: {e}. Using fallback.")
            return await self.agents[fallback_agent](task, context)
    
    async def run_with_validation(
        self,
        agent: str,
        task: str,
        validator: callable,
        max_retries: int = 3
    ) -> str:
        """Run agent and validate output, retry if invalid."""
        for i in range(max_retries):
            result = await self.agents[agent](task)
            if validator(result):
                return result
            print(f"Validation failed (attempt {i+1}), retrying with feedback...")
            task = f"{task}\n\nPrevious attempt failed validation: {result}\n\nPlease fix and try again."
        raise ValueError(f"Agent failed validation after {max_retries} attempts")

A2A Protocol (Agent-to-Agent)

# Standard A2A message format (emerging industry standard)
# Compatible with Google's A2A spec

class A2ARequest:
    """Standard agent-to-agent request."""
    
    @staticmethod
    def create(
        task_id: str,
        from_agent: str,
        to_agent: str,
        task_type: str,
        inputs: dict,
        context: dict = None
    ) -> dict:
        return {
            "jsonrpc": "2.0",
            "method": "agent.execute",
            "id": str(uuid.uuid4()),
            "params": {
                "task": {
                    "id": task_id,
                    "type": task_type,
                    "inputs": inputs,
                    "context": context or {},
                },
                "routing": {
                    "from": from_agent,
                    "to": to_agent,
                    "timestamp": datetime.utcnow().isoformat(),
                }
            }
        }

# MoltbotDen A2A via MCP
# Agents on MoltbotDen can send tasks to each other via the messaging system
# POST /conversations/{id}/messages with structured task payload