Getting multiple agents to work together without stepping on each other's toes is harder than it looks. I've spent more late nights than I'd like to admit watching agents deadlock, duplicate work, or confidently march in opposite directions. This article covers the coordination patterns that actually work in production.
Why Coordination Is Hard
When you have one agent, life is simple. Add a second, and you've introduced race conditions, conflicting decisions, and the question of who's in charge. By the time you have five agents, you're dealing with a distributed systems problem whether you wanted one or not.
The core challenge: agents need to make decisions with incomplete information about what other agents are doing. Network latency, failures, and the asynchronous nature of most agent frameworks mean you can't assume perfect knowledge.
Coordination Patterns That Work
The Blackboard Pattern
Think of it as a shared workspace where agents post their findings and read what others have discovered. It's old-school (dating back to 1970s AI research), but it's survived because it works.
How it works:
- Central knowledge store (the "blackboard")
- Agents read from and write to the blackboard
- No direct agent-to-agent communication
- Changes trigger interested agents
I've used this pattern when building research agents that needed to collaboratively analyze documents. Each agent would post facts, hypotheses, and evidence to the blackboard. Others would pick up those findings and build on them.
from typing import Dict, List, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
@dataclass
class BlackboardEntry:
key: str
value: Any
author: str
timestamp: datetime = field(default_factory=datetime.now)
confidence: float = 1.0
class Blackboard:
def __init__(self):
self.entries: Dict[str, BlackboardEntry] = {}
self.subscribers: Dict[str, List[Callable]] = {}
self._lock = asyncio.Lock()
async def write(self, key: str, value: Any, author: str, confidence: float = 1.0):
"""Write an entry to the blackboard and notify subscribers"""
async with self._lock:
entry = BlackboardEntry(key, value, author, confidence=confidence)
self.entries[key] = entry
# Notify subscribers
if key in self.subscribers:
for callback in self.subscribers[key]:
asyncio.create_task(callback(entry))
async def read(self, key: str) -> BlackboardEntry | None:
"""Read an entry from the blackboard"""
async with self._lock:
return self.entries.get(key)
async def subscribe(self, pattern: str, callback: Callable):
"""Subscribe to changes matching a pattern"""
async with self._lock:
if pattern not in self.subscribers:
self.subscribers[pattern] = []
self.subscribers[pattern].append(callback)
# Example usage
class ResearchAgent:
def __init__(self, name: str, blackboard: Blackboard):
self.name = name
self.blackboard = blackboard
async def contribute_finding(self, topic: str, finding: str):
await self.blackboard.write(
f"finding:{topic}",
finding,
author=self.name,
confidence=0.85
)
async def on_new_finding(self, entry: BlackboardEntry):
print(f"{self.name} saw: {entry.value} (by {entry.author})")
# Agent can now build on this finding
# Setup
blackboard = Blackboard()
agent1 = ResearchAgent("Researcher-1", blackboard)
agent2 = ResearchAgent("Researcher-2", blackboard)
# Agent 2 subscribes to findings
await blackboard.subscribe("finding:*", agent2.on_new_finding)
# Agent 1 contributes
await agent1.contribute_finding("AI", "LLMs show emergent reasoning capabilities")
When to use it: Research tasks, collaborative problem-solving, scenarios where agents need to build on each other's work incrementally.
When to avoid it: Real-time systems with tight latency requirements, or when the blackboard becomes a bottleneck.
Contract Net Protocol
This is auction-based coordination. One agent (the "manager") announces a task, agents bid on it, and the manager awards it to the best bidder.
I first used this pattern when building a distributed data processing system. Different agents had different capabilities and current loads. Contract net let them self-organize based on who was best positioned to handle each job.
interface Task {
id: string;
requirements: string[];
priority: number;
deadline: Date;
}
interface Bid {
agentId: string;
cost: number;
estimatedTime: number;
confidence: number;
}
class ContractNetManager {
private agents: Map<string, Agent> = new Map();
async announceTask(task: Task): Promise<string> {
// 1. Announce task to all capable agents
const bids: Bid[] = [];
for (const [id, agent] of this.agents) {
if (this.canHandle(agent, task)) {
const bid = await agent.generateBid(task);
if (bid) bids.push(bid);
}
}
// 2. Evaluate bids
const winner = this.selectWinner(bids, task);
// 3. Award contract
if (winner) {
const agent = this.agents.get(winner.agentId);
await agent?.executeTask(task);
return winner.agentId;
}
throw new Error(`No agent could handle task ${task.id}`);
}
private selectWinner(bids: Bid[], task: Task): Bid | null {
if (bids.length === 0) return null;
// Score bids based on cost, time, and confidence
return bids.reduce((best, current) => {
const currentScore = this.scoreBid(current, task);
const bestScore = this.scoreBid(best, task);
return currentScore > bestScore ? current : best;
});
}
private scoreBid(bid: Bid, task: Task): number {
// Weight factors based on task priority
const timeWeight = task.priority > 0.8 ? 0.5 : 0.2;
const costWeight = task.priority > 0.8 ? 0.2 : 0.5;
const confidenceWeight = 0.3;
return (
(1 / bid.estimatedTime) * timeWeight +
(1 / bid.cost) * costWeight +
bid.confidence * confidenceWeight
);
}
private canHandle(agent: Agent, task: Task): boolean {
return task.requirements.every(req =>
agent.capabilities.includes(req)
);
}
}
class Agent {
constructor(
public id: string,
public capabilities: string[],
private currentLoad: number
) {}
async generateBid(task: Task): Promise<Bid | null> {
// Don't bid if overloaded
if (this.currentLoad > 0.9) return null;
// Calculate bid based on current capacity
const baseCost = 100;
const loadMultiplier = 1 + this.currentLoad;
return {
agentId: this.id,
cost: baseCost * loadMultiplier,
estimatedTime: this.estimateTime(task),
confidence: this.calculateConfidence(task)
};
}
async executeTask(task: Task): Promise<void> {
console.log(`${this.id} executing task ${task.id}`);
// Implementation here
}
private estimateTime(task: Task): number {
// Simple estimation based on requirements
return task.requirements.length * 60; // seconds
}
private calculateConfidence(task: Task): number {
// Confidence based on how many requirements we have
const matchRatio = task.requirements.filter(r =>
this.capabilities.includes(r)
).length / task.requirements.length;
return matchRatio;
}
}
When to use it: Resource allocation, load balancing, when agents have heterogeneous capabilities.
When to avoid it: Latency-critical paths (the bidding round adds overhead), or when you need deterministic task assignment.
Consensus Without Blockchain
You don't need a blockchain to get agents to agree. Here are lighter-weight consensus mechanisms I've found useful:
Majority Voting
Simple but effective. Each agent votes, majority wins. The trick is handling ties and agents that don't respond.
from typing import Dict, Set, Optional
from enum import Enum
import asyncio
class Vote(Enum):
YES = "yes"
NO = "no"
ABSTAIN = "abstain"
class VotingCoordinator:
def __init__(self, timeout_seconds: float = 10.0):
self.timeout = timeout_seconds
async def reach_consensus(
self,
agents: Dict[str, 'VotingAgent'],
proposal: str
) -> tuple[bool, Dict[str, Vote]]:
"""
Returns (decision, votes)
Decision is True if majority votes yes
"""
vote_tasks = {
agent_id: asyncio.create_task(agent.vote(proposal))
for agent_id, agent in agents.items()
}
# Wait for all votes with timeout
votes: Dict[str, Vote] = {}
done, pending = await asyncio.wait(
vote_tasks.values(),
timeout=self.timeout
)
# Cancel non-responders
for task in pending:
task.cancel()
# Collect votes
for agent_id, task in vote_tasks.items():
if task in done:
try:
votes[agent_id] = await task
except Exception:
votes[agent_id] = Vote.ABSTAIN
else:
votes[agent_id] = Vote.ABSTAIN
# Count votes
yes_votes = sum(1 for v in votes.values() if v == Vote.YES)
no_votes = sum(1 for v in votes.values() if v == Vote.NO)
# Majority decision
decision = yes_votes > no_votes
return decision, votes
class VotingAgent:
def __init__(self, name: str, bias: float = 0.5):
self.name = name
self.bias = bias # 0.0 = always no, 1.0 = always yes
async def vote(self, proposal: str) -> Vote:
# Simulate thinking
await asyncio.sleep(0.1)
# Simple logic: vote based on bias and proposal content
import random
score = self.bias + random.uniform(-0.2, 0.2)
if score > 0.6:
return Vote.YES
elif score < 0.4:
return Vote.NO
else:
return Vote.ABSTAIN
Weighted Consensus
Sometimes agents aren't equal. An expert agent's opinion should carry more weight than a novice's.
class WeightedConsensus:
def __init__(self, agent_weights: Dict[str, float]):
"""agent_weights maps agent_id to their voting weight (0.0-1.0)"""
self.weights = agent_weights
async def decide(
self,
votes: Dict[str, Vote]
) -> tuple[bool, float]:
"""Returns (decision, confidence)"""
weighted_yes = 0.0
weighted_no = 0.0
total_weight = 0.0
for agent_id, vote in votes.items():
weight = self.weights.get(agent_id, 0.5)
total_weight += weight
if vote == Vote.YES:
weighted_yes += weight
elif vote == Vote.NO:
weighted_no += weight
if total_weight == 0:
return False, 0.0
yes_ratio = weighted_yes / total_weight
decision = yes_ratio > 0.5
confidence = max(yes_ratio, 1 - yes_ratio)
return decision, confidence
Task Decomposition and Delegation
Breaking down complex tasks into subtasks that agents can handle independently is an art. I've found hierarchical task networks (HTN) work well here.
The key insight: tasks should be decomposed until they match agent capabilities, not decomposed to arbitrary depth.
from dataclasses import dataclass
from typing import List, Optional, Callable
import asyncio
@dataclass
class Task:
name: str
complexity: int
required_capability: str
@dataclass
class SubTask:
parent: Task
subtasks: List[Task]
class TaskDecomposer:
def __init__(self, max_complexity: int = 5):
self.max_complexity = max_complexity
self.decomposition_rules: Dict[str, Callable] = {}
def register_rule(self, task_type: str, decomposer: Callable):
"""Register how to break down a specific task type"""
self.decomposition_rules[task_type] = decomposer
def decompose(self, task: Task) -> List[Task]:
"""Recursively decompose until tasks are simple enough"""
if task.complexity <= self.max_complexity:
return [task]
# Check if we have a rule for this task type
if task.name in self.decomposition_rules:
subtasks = self.decomposition_rules[task.name](task)
# Recursively decompose subtasks
result = []
for subtask in subtasks:
result.extend(self.decompose(subtask))
return result
# No rule found, can't decompose further
return [task]
class DelegatingCoordinator:
def __init__(self, agents: List['CapableAgent']):
self.agents = agents
self.decomposer = TaskDecomposer()
async def execute(self, task: Task) -> List[Any]:
"""Decompose and delegate task to capable agents"""
subtasks = self.decomposer.decompose(task)
# Match subtasks to agents
assignments = []
for subtask in subtasks:
agent = self.find_capable_agent(subtask)
if agent:
assignments.append((agent, subtask))
else:
raise ValueError(f"No agent can handle: {subtask.name}")
# Execute in parallel
results = await asyncio.gather(*[
agent.execute(subtask)
for agent, subtask in assignments
])
return results
def find_capable_agent(self, task: Task) -> Optional['CapableAgent']:
"""Find agent with matching capability and available capacity"""
for agent in self.agents:
if (task.required_capability in agent.capabilities and
agent.current_load < agent.max_load):
return agent
return None
class CapableAgent:
def __init__(self, name: str, capabilities: List[str], max_load: int = 5):
self.name = name
self.capabilities = capabilities
self.max_load = max_load
self.current_load = 0
async def execute(self, task: Task) -> str:
self.current_load += 1
try:
# Simulate work
await asyncio.sleep(0.5)
return f"{self.name} completed {task.name}"
finally:
self.current_load -= 1
Leader Election in Agent Swarms
When you need one agent to coordinate the others, you need leader election. I've used the Bully algorithm—it's simple and works well in practice.
import asyncio
from typing import Dict, Optional
class Agent:
def __init__(self, agent_id: int, all_agents: Dict[int, 'Agent']):
self.id = agent_id
self.all_agents = all_agents
self.is_leader = False
self.leader_id: Optional[int] = None
self.alive = True
async def start_election(self):
"""Bully algorithm: agent with highest ID wins"""
print(f"Agent {self.id} starting election")
# Send election message to all agents with higher IDs
higher_agents = [aid for aid in self.all_agents.keys() if aid > self.id]
if not higher_agents:
# No higher ID agents, I'm the leader
await self.become_leader()
return
# Ask higher agents to participate
responses = await asyncio.gather(*[
self.send_election_msg(aid)
for aid in higher_agents
], return_exceptions=True)
# If any higher agent responded, they'll handle it
if any(r for r in responses if r is True):
print(f"Agent {self.id} deferring to higher agent")
else:
# No response from higher agents, I'm the leader
await self.become_leader()
async def send_election_msg(self, agent_id: int) -> bool:
"""Send election message to another agent"""
agent = self.all_agents.get(agent_id)
if agent and agent.alive:
# Higher agent starts their own election
asyncio.create_task(agent.start_election())
return True
return False
async def become_leader(self):
"""Declare victory"""
self.is_leader = True
self.leader_id = self.id
print(f"Agent {self.id} is now the leader")
# Notify all agents
for agent_id, agent in self.all_agents.items():
if agent_id != self.id and agent.alive:
agent.leader_id = self.id
agent.is_leader = False
# Example usage
async def demo_leader_election():
agents = {i: Agent(i, {}) for i in range(1, 6)}
for agent in agents.values():
agent.all_agents = agents
# Simulate agent 5 (highest ID) failing
agents[5].alive = False
# Agent 2 notices leader is gone, starts election
await agents[2].start_election()
# Agent 4 should win
for aid, agent in agents.items():
print(f"Agent {aid}: leader={agent.leader_id}, is_leader={agent.is_leader}")
Conflict Resolution
Agents will disagree. Here's how I handle it:
Failure Handling
Agents fail. Networks partition. Timeouts happen. Here's what I've learned:
Heartbeats are non-negotiable. Every agent should periodically signal it's alive:
class HeartbeatMonitor:
def __init__(self, timeout: float = 30.0):
self.timeout = timeout
self.last_heartbeat: Dict[str, float] = {}
async def monitor(self, agent_id: str):
"""Monitor an agent's health"""
while True:
await asyncio.sleep(self.timeout / 2)
import time
last_seen = self.last_heartbeat.get(agent_id, 0)
if time.time() - last_seen > self.timeout:
await self.on_agent_failure(agent_id)
def record_heartbeat(self, agent_id: str):
import time
self.last_heartbeat[agent_id] = time.time()
async def on_agent_failure(self, agent_id: str):
print(f"Agent {agent_id} has failed!")
# Trigger failover, reassign tasks, etc.
Circuit breakers prevent cascade failures. If an agent keeps failing, stop calling it:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout: float = 60.0
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Scalability: The Coordination Tax
Every agent you add increases coordination overhead. I've found these inflection points:
- 2-3 agents: Minimal overhead, direct communication works
- 4-10 agents: Need structured coordination (blackboard, contract net)
- 10-50 agents: Hierarchical structures become necessary
- 50+ agents: Need distributed coordination, eventual consistency
Real-World Patterns
CrewAI Style
Sequential and hierarchical. One agent manages, others execute. Simple but effective:
class Crew:
def __init__(self, agents: List[Agent], tasks: List[Task]):
self.agents = agents
self.tasks = tasks
async def run(self):
results = []
for task in self.tasks:
# Find best agent for this task
agent = self.assign_task(task)
result = await agent.execute(task)
results.append(result)
return results
AutoGen Style
Conversational. Agents talk until they reach consensus or max rounds:
class AutoGenOrchestrator:
async def run_conversation(
self,
agents: List[Agent],
initial_message: str,
max_rounds: int = 10
):
messages = [initial_message]
for round in range(max_rounds):
for agent in agents:
response = await agent.respond(messages)
messages.append(response)
if self.is_task_complete(messages):
return messages
return messages
LangGraph Style
Graph-based routing. Agents are nodes, decisions are edges:
class AgentGraph:
def __init__(self):
self.nodes: Dict[str, Agent] = {}
self.edges: Dict[str, List[str]] = {}
async def execute(self, start_node: str, input_data: Any):
current = start_node
data = input_data
while current:
agent = self.nodes[current]
data = await agent.execute(data)
# Route to next node based on output
next_node = self.route(current, data)
current = next_node
return data
Key Takeaways
- Choose coordination patterns based on your communication needs, not what's trendy
- Consensus doesn't require blockchain - voting and weighted consensus work for most cases
- Task decomposition should match agent capabilities, not arbitrary depth
- Leader election prevents chaos in swarms that need coordination
- Heartbeats and circuit breakers are mandatory for production systems
- Coordination overhead grows quadratically - use hierarchies to break it
- Real-world frameworks (CrewAI, AutoGen, LangGraph) each optimize for different coordination styles
FAQ
Q: When should I use blackboard vs contract net?
A: Blackboard for collaborative knowledge building where agents build on each other's work. Contract net for resource allocation where you need to distribute tasks to heterogeneous agents. I use blackboard for research tasks, contract net for job scheduling.
Q: How do I prevent deadlocks in multi-agent systems?
A: Use timeouts everywhere, implement deadlock detection (wait-for graphs), and have a global coordinator that can break deadlocks. The simplest approach: set maximum wait times and fail fast rather than wait forever.
Q: What's the right agent count for a coordination task?
A: Start with 3-5. More agents don't always help—they add coordination overhead. I've seen systems where going from 10 agents to 5 actually improved throughput because coordination costs dropped more than parallel capacity.
Q: How do I handle an agent that goes rogue or produces bad results?
A: Implement result validation (other agents verify work), use confidence scores in consensus voting, and have circuit breakers that isolate misbehaving agents. In production, I always have a manual override to remove an agent from the pool.
Q: Should agents communicate directly or through a message bus?
A: Message bus (pub/sub) for loose coupling and scaling beyond 10 agents. Direct communication for tight coordination with small agent counts. I default to pub/sub because it's easier to debug and monitor.