Essential Agent Patterns

Master the fundamental design patterns that power production AI agent systems. These battle-tested patterns solve common challenges in building scalable, maintainable, and reliable AI agents.

Single Responsibility

Each agent focuses on one specific task or domain

Chain of Thought

Break complex reasoning into sequential steps

Hierarchical Planning

Decompose goals into manageable sub-tasks

🔗
Chain of Responsibility Pattern

Build agent pipelines where each agent handles specific aspects of a request:

1
Input Validation Agent

Validate and sanitize user inputs before processing

2
Intent Classification Agent

Determine user intent and route to appropriate handlers

3
Task Execution Agent

Process the request based on classified intent

4
Response Generation Agent

Format and personalize the response

5
Quality Assurance Agent

Review output for accuracy and compliance

# Chain of Responsibility Pattern Implementation
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict
import asyncio

class AgentHandler(ABC):
    """Abstract base class for agent handlers in the chain"""
    
    def __init__(self):
        self.next_handler: Optional[AgentHandler] = None
    
    def set_next(self, handler: 'AgentHandler') -> 'AgentHandler':
        self.next_handler = handler
        return handler
    
    @abstractmethod
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Process the request or pass to next handler"""
        pass
    
    async def handle_next(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Pass request to next handler if available"""
        if self.next_handler:
            return await self.next_handler.handle(request)
        return request

class ValidationAgent(AgentHandler):
    """Validates and sanitizes input data"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("🔍 ValidationAgent: Validating input...")
        
        # Validate required fields
        required_fields = ['user_id', 'message', 'context']
        for field in required_fields:
            if field not in request:
                raise ValueError(f"Missing required field: {field}")
        
        # Sanitize input
        request['message'] = self.sanitize_input(request['message'])
        request['validated'] = True
        
        return await self.handle_next(request)
    
    def sanitize_input(self, text: str) -> str:
        # Remove potentially harmful content
        return text.strip()[:1000]  # Limit length

class IntentClassificationAgent(AgentHandler):
    """Classifies user intent using NLP"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("🎯 IntentClassificationAgent: Classifying intent...")
        
        message = request['message'].lower()
        
        # Simple intent classification (replace with ML model)
        if 'help' in message or 'support' in message:
            request['intent'] = 'support'
        elif 'buy' in message or 'purchase' in message:
            request['intent'] = 'purchase'
        elif 'status' in message or 'track' in message:
            request['intent'] = 'tracking'
        else:
            request['intent'] = 'general'
        
        request['confidence'] = 0.95  # Confidence score
        
        return await self.handle_next(request)

class TaskExecutionAgent(AgentHandler):
    """Executes task based on classified intent"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print(f"⚡ TaskExecutionAgent: Executing {request.get('intent')} task...")
        
        intent = request.get('intent', 'general')
        
        # Route to appropriate task handler
        if intent == 'support':
            request['result'] = await self.handle_support(request)
        elif intent == 'purchase':
            request['result'] = await self.handle_purchase(request)
        elif intent == 'tracking':
            request['result'] = await self.handle_tracking(request)
        else:
            request['result'] = await self.handle_general(request)
        
        return await self.handle_next(request)
    
    async def handle_support(self, request: Dict) -> str:
        return "I'll connect you with our support team right away."
    
    async def handle_purchase(self, request: Dict) -> str:
        return "Let me help you with your purchase."
    
    async def handle_tracking(self, request: Dict) -> str:
        return "I'll check your order status for you."
    
    async def handle_general(self, request: Dict) -> str:
        return "How can I assist you today?"

class ResponseGenerationAgent(AgentHandler):
    """Generates formatted response"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("✨ ResponseGenerationAgent: Generating response...")
        
        # Format response with context
        response = {
            'user_id': request['user_id'],
            'message': request.get('result', 'Processing your request...'),
            'intent': request.get('intent'),
            'confidence': request.get('confidence'),
            'timestamp': asyncio.get_event_loop().time()
        }
        
        request['response'] = response
        
        return await self.handle_next(request)

class QualityAssuranceAgent(AgentHandler):
    """Reviews output for quality and compliance"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("✅ QualityAssuranceAgent: Reviewing output...")
        
        response = request.get('response', {})
        
        # Check for prohibited content
        if self.contains_prohibited_content(response.get('message', '')):
            response['message'] = "I apologize, but I cannot process this request."
            response['flagged'] = True
        
        # Add quality metrics
        response['quality_score'] = self.calculate_quality_score(response)
        
        return request
    
    def contains_prohibited_content(self, text: str) -> bool:
        # Check for prohibited patterns
        prohibited = ['password', 'credit card', 'ssn']
        return any(word in text.lower() for word in prohibited)
    
    def calculate_quality_score(self, response: Dict) -> float:
        # Simple quality scoring
        score = 1.0
        if response.get('flagged'):
            score *= 0.5
        if response.get('confidence', 0) < 0.8:
            score *= 0.8
        return score

# Usage Example
async def process_request():
    # Create the chain
    validation = ValidationAgent()
    classification = IntentClassificationAgent()
    execution = TaskExecutionAgent()
    generation = ResponseGenerationAgent()
    quality = QualityAssuranceAgent()
    
    # Link the chain
    validation.set_next(classification).set_next(execution).set_next(generation).set_next(quality)
    
    # Process request
    request = {
        'user_id': 'user123',
        'message': 'I need help tracking my order',
        'context': {'session_id': 'abc123'}
    }
    
    result = await validation.handle(request)
    print(f"\n📤 Final Response: {result['response']}")
    
    return result

# Run the chain
if __name__ == "__main__":
    asyncio.run(process_request())
                
Important: Each agent in the chain should have a single, well-defined responsibility. This makes the system easier to test, maintain, and scale.
🌐
Multi-Agent Orchestration Pattern

Coordinate multiple specialized agents working together on complex tasks:

1
Define Agent Roles

Assign specific capabilities to each agent

2
Create Orchestrator

Build central coordinator to manage agents

3
Implement Communication

Set up message passing between agents

4
Handle Dependencies

Manage task dependencies and sequencing

5
Aggregate Results

Combine outputs from multiple agents

# Multi-Agent Orchestration Pattern
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AgentRole(Enum):
    RESEARCHER = "researcher"
    ANALYZER = "analyzer"
    WRITER = "writer"
    REVIEWER = "reviewer"
    COORDINATOR = "coordinator"

@dataclass
class AgentMessage:
    sender: str
    recipient: str
    content: Any
    message_type: str
    priority: int = 0

class Agent:
    """Base agent class with communication capabilities"""
    
    def __init__(self, name: str, role: AgentRole):
        self.name = name
        self.role = role
        self.inbox: asyncio.Queue = asyncio.Queue()
        self.knowledge_base: Dict[str, Any] = {}
        self.running = False
    
    async def send_message(self, recipient: 'Agent', content: Any, 
                          message_type: str = "task") -> None:
        """Send message to another agent"""
        message = AgentMessage(
            sender=self.name,
            recipient=recipient.name,
            content=content,
            message_type=message_type
        )
        await recipient.inbox.put(message)
    
    async def receive_message(self) -> Optional[AgentMessage]:
        """Receive message from inbox"""
        try:
            return await asyncio.wait_for(self.inbox.get(), timeout=1.0)
        except asyncio.TimeoutError:
            return None
    
    async def process(self) -> Any:
        """Process agent tasks (override in subclasses)"""
        raise NotImplementedError
    
    async def run(self) -> None:
        """Main agent loop"""
        self.running = True
        while self.running:
            message = await self.receive_message()
            if message:
                await self.handle_message(message)
            await asyncio.sleep(0.1)
    
    async def handle_message(self, message: AgentMessage) -> None:
        """Handle incoming messages"""
        print(f"📨 {self.name} received: {message.message_type} from {message.sender}")
        if message.message_type == "task":
            result = await self.process_task(message.content)
            await self.send_result(message.sender, result)
        elif message.message_type == "stop":
            self.running = False
    
    async def process_task(self, task: Dict[str, Any]) -> Any:
        """Process specific task (override in subclasses)"""
        return await self.process()
    
    async def send_result(self, recipient_name: str, result: Any) -> None:
        """Send result back to sender"""
        # This would normally look up the agent by name
        print(f"📤 {self.name} sending result to {recipient_name}")

class ResearchAgent(Agent):
    """Agent specialized in research tasks"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.RESEARCHER)
    
    async def process(self) -> Dict[str, Any]:
        """Perform research on given topic"""
        await asyncio.sleep(1)  # Simulate research time
        return {
            'findings': [
                'Key insight 1: Market trends indicate growth',
                'Key insight 2: Customer satisfaction is high',
                'Key insight 3: Competition is increasing'
            ],
            'sources': ['report1.pdf', 'survey2.xlsx', 'analysis3.doc']
        }

class AnalyzerAgent(Agent):
    """Agent specialized in data analysis"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.ANALYZER)
    
    async def process(self) -> Dict[str, Any]:
        """Analyze provided data"""
        await asyncio.sleep(1.5)  # Simulate analysis time
        return {
            'metrics': {
                'growth_rate': 0.15,
                'market_share': 0.23,
                'customer_retention': 0.87
            },
            'trends': ['upward', 'stable', 'improving'],
            'recommendations': [
                'Invest in product development',
                'Expand marketing efforts',
                'Improve customer support'
            ]
        }

class WriterAgent(Agent):
    """Agent specialized in content generation"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.WRITER)
    
    async def process(self) -> str:
        """Generate written content"""
        await asyncio.sleep(2)  # Simulate writing time
        return """
        Executive Summary Report
        
        Based on comprehensive research and analysis, our findings indicate:
        
        1. Market Position: Strong growth trajectory with 15% YoY increase
        2. Customer Metrics: 87% retention rate exceeds industry average
        3. Strategic Recommendations: Focus on product innovation and market expansion
        
        Detailed analysis supports immediate action on identified opportunities.
        """

class Orchestrator:
    """Coordinates multiple agents to complete complex tasks"""
    
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.results: Dict[str, Any] = {}
    
    def register_agent(self, agent: Agent) -> None:
        """Register an agent with the orchestrator"""
        self.agents[agent.name] = agent
        print(f"✅ Registered agent: {agent.name} ({agent.role.value})")
    
    async def execute_workflow(self, workflow: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Execute a workflow across multiple agents"""
        print("\n🚀 Starting workflow execution...")
        
        for step in workflow:
            agent_name = step['agent']
            task = step['task']
            dependencies = step.get('dependencies', [])
            
            # Wait for dependencies
            for dep in dependencies:
                while dep not in self.results:
                    await asyncio.sleep(0.1)
            
            # Execute task
            agent = self.agents.get(agent_name)
            if agent:
                print(f"\n⚡ Executing: {task['name']} with {agent_name}")
                
                # Add dependency results to task
                task['inputs'] = {dep: self.results[dep] for dep in dependencies}
                
                # Process task
                result = await agent.process()
                self.results[task['name']] = result
                
                print(f"✅ Completed: {task['name']}")
        
        return self.results
    
    async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Execute multiple tasks in parallel"""
        print("\n🔄 Starting parallel execution...")
        
        async def execute_task(task: Dict[str, Any]) -> tuple:
            agent = self.agents.get(task['agent'])
            if agent:
                result = await agent.process()
                return (task['name'], result)
            return (task['name'], None)
        
        # Execute all tasks in parallel
        results = await asyncio.gather(*[execute_task(task) for task in tasks])
        
        # Convert to dictionary
        return dict(results)

# Usage Example
async def main():
    # Create orchestrator
    orchestrator = Orchestrator()
    
    # Create and register agents
    research_agent = ResearchAgent("research_bot")
    analyzer_agent = AnalyzerAgent("analyzer_bot")
    writer_agent = WriterAgent("writer_bot")
    
    orchestrator.register_agent(research_agent)
    orchestrator.register_agent(analyzer_agent)
    orchestrator.register_agent(writer_agent)
    
    # Define workflow
    workflow = [
        {
            'agent': 'research_bot',
            'task': {'name': 'market_research', 'type': 'research'},
            'dependencies': []
        },
        {
            'agent': 'analyzer_bot',
            'task': {'name': 'data_analysis', 'type': 'analysis'},
            'dependencies': ['market_research']
        },
        {
            'agent': 'writer_bot',
            'task': {'name': 'report_generation', 'type': 'writing'},
            'dependencies': ['market_research', 'data_analysis']
        }
    ]
    
    # Execute workflow
    results = await orchestrator.execute_workflow(workflow)
    
    print("\n📊 Workflow Results:")
    for task_name, result in results.items():
        print(f"\n{task_name}:")
        if isinstance(result, dict):
            for key, value in result.items():
                print(f"  {key}: {value}")
        else:
            print(f"  {result[:100]}...")  # First 100 chars
    
    # Example of parallel execution
    parallel_tasks = [
        {'agent': 'research_bot', 'name': 'research_task_1'},
        {'agent': 'analyzer_bot', 'name': 'analysis_task_1'},
        {'agent': 'writer_bot', 'name': 'writing_task_1'}
    ]
    
    parallel_results = await orchestrator.parallel_execution(parallel_tasks)
    print("\n⚡ Parallel Execution Results:")
    for task_name in parallel_results:
        print(f"  ✅ {task_name} completed")

if __name__ == "__main__":
    asyncio.run(main())
                
🧠
ReAct (Reasoning + Acting) Pattern

Combine reasoning and action in an iterative loop for complex problem-solving:

1
Observe Environment

Gather current state and available information

2
Think & Reason

Analyze situation and plan next action

3
Act on Decision

Execute chosen action in environment

4
Observe Results

Monitor outcome of action

5
Iterate or Complete

Continue loop or finish if goal achieved

📋 Common Agent Patterns

Essential patterns for production AI systems:

Pattern Comparison

Pattern Use Case Complexity Scalability
Chain of Responsibility Sequential processing pipeline Low High
Multi-Agent Orchestration Complex collaborative tasks High Very High
ReAct Pattern Interactive problem solving Medium Medium
Hierarchical Planning Goal decomposition Medium High
Blackboard Pattern Shared knowledge systems High Medium
Best Practices:
  • Keep agents focused on single responsibilities
  • Implement robust error handling and recovery
  • Use async/await for concurrent operations
  • Monitor agent performance and resource usage
  • Implement circuit breakers for failing agents
  • Version your agent interfaces for compatibility
  • Test agents individually and in combination
  • Document agent capabilities and dependencies