AWS Agent Squad: Complete Tutorial for Multi-Agent Orchestration Framework
⏱️ Estimated Reading Time: 15 minutes
Introduction to Agent Squad
AWS Labs’ Agent Squad (formerly Multi-Agent Orchestrator) is a flexible, lightweight open-source framework designed for orchestrating multiple AI agents to handle complex conversations. With over 6.6k GitHub stars and growing community support, it represents a significant advancement in multi-agent AI systems.
What Makes Agent Squad Special?
Agent Squad addresses the growing need for intelligent conversation routing in AI applications. Instead of having a single AI agent handle all queries, it intelligently distributes conversations to specialized agents based on context and intent.
Key Features and Capabilities
🧠 Intelligent Intent Classification
The framework dynamically routes queries to the most suitable agent based on:
- Context analysis: Understanding conversation flow and history
- Content evaluation: Analyzing query semantics and intent
- Agent specialization: Matching queries to agent expertise
🔤 Dual Language Support
Full implementation in both Python and TypeScript:
- Identical functionality across languages
- Language-specific optimizations
- Seamless integration with existing codebases
🌊 Flexible Response Handling
Support for both streaming and non-streaming responses:
- Real-time streaming: For interactive conversations
- Batch processing: For analytical tasks
- Mixed mode support: Different agents can use different response types
📚 Context Management
Sophisticated conversation context handling:
- Cross-agent memory: Maintain context when switching between agents
- Session persistence: Remember conversation history
- Context inheritance: Pass relevant information between agents
Architecture Overview
graph TD
A[User Query] --> B[Agent Squad Orchestrator]
B --> C[Intent Classifier]
C --> D{Route Decision}
D --> E[Tech Agent]
D --> F[Health Agent]
D --> G[Travel Agent]
D --> H[Custom Agent]
E --> I[Bedrock LLM]
F --> J[OpenAI GPT]
G --> K[Lex Bot]
H --> L[Lambda Function]
I --> M[Response Handler]
J --> M
K --> M
L --> M
M --> N[Context Manager]
N --> O[Final Response]
The architecture consists of:
- Orchestrator: Central routing and management
- Classifiers: Intent detection and agent selection
- Agents: Specialized AI components
- Context Manager: Memory and state management
- Response Handler: Output processing and formatting
Installation and Setup
Python Installation
Agent Squad offers modular installation options based on your integration needs:
# Basic AWS integration (most common)
pip install "agent-squad[aws]"
# OpenAI integration
pip install "agent-squad[openai]"
# Anthropic integration
pip install "agent-squad[anthropic]"
# Full installation with all integrations
pip install "agent-squad[all]"
Environment Setup
Create a virtual environment for isolation:
# Create virtual environment
python -m venv agent-squad-env
source agent-squad-env/bin/activate # On Windows: agent-squad-env\Scripts\activate
# Install with AWS support
pip install "agent-squad[aws]"
TypeScript/Node.js Installation
# Initialize new project
npm init -y
# Install Agent Squad
npm install @awslabs/agent-squad
# Install AWS SDK (if using AWS integrations)
npm install @aws-sdk/client-bedrock-runtime
Basic Implementation Tutorial
Python Implementation
Let’s create a basic multi-agent system with specialized agents:
import sys
import asyncio
from agent_squad.orchestrator import AgentSquad
from agent_squad.agents import BedrockLLMAgent, BedrockLLMAgentOptions, AgentStreamResponse
class AgentSquadTutorial:
def __init__(self):
# Initialize the orchestrator
self.orchestrator = AgentSquad()
# Configure agents
self._setup_agents()
def _setup_agents(self):
"""Configure specialized agents for different domains"""
# Technology specialist agent
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Technology Expert",
streaming=True,
description="""Expert in software development, cloud computing, AI/ML,
cybersecurity, blockchain, and emerging technologies.
Provides technical guidance, architecture advice, and
cost analysis for technology solutions.""",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
# Health and wellness agent
health_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Health & Wellness Expert",
streaming=True,
description="""Specializes in health, wellness, nutrition, fitness,
mental health, and medical information. Provides
evidence-based health guidance and wellness tips.""",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
# Business and finance agent
business_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Business & Finance Expert",
streaming=True,
description="""Expert in business strategy, financial planning,
market analysis, entrepreneurship, and business
operations. Provides strategic business insights.""",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
# Add agents to orchestrator
self.orchestrator.add_agent(tech_agent)
self.orchestrator.add_agent(health_agent)
self.orchestrator.add_agent(business_agent)
async def process_query(self, user_input, user_id="user123", session_id="session456"):
"""Process a user query through the agent squad"""
try:
# Route the request to appropriate agent
response = await self.orchestrator.route_request(
user_input=user_input,
user_id=user_id,
session_id=session_id,
additional_params={},
streaming=True
)
# Handle the response
await self._handle_response(response)
except Exception as e:
print(f"Error processing query: {e}")
async def _handle_response(self, response):
"""Handle both streaming and non-streaming responses"""
if response.streaming:
print("\n🤖 **STREAMING RESPONSE**\n")
# Display metadata
self._print_metadata(response.metadata)
print("\n📝 **Response:**")
# Stream the content
async for chunk in response.output:
if isinstance(chunk, AgentStreamResponse):
print(chunk.text, end='', flush=True)
else:
print(f"Unexpected chunk type: {type(chunk)}", file=sys.stderr)
print("\n") # New line after streaming
else:
# Handle non-streaming response
print("\n🤖 **RESPONSE**\n")
self._print_metadata(response.metadata)
print(f"\n📝 **Response:** {response.output.content}")
def _print_metadata(self, metadata):
"""Print response metadata in a formatted way"""
print(f"🎯 **Agent:** {metadata.agent_name} (ID: {metadata.agent_id})")
print(f"👤 **User:** {metadata.user_id}")
print(f"🔗 **Session:** {metadata.session_id}")
print(f"❓ **Query:** {metadata.user_input}")
if metadata.additional_params:
print(f"⚙️ **Parameters:** {metadata.additional_params}")
# Example usage and testing
async def main():
"""Main function to demonstrate Agent Squad capabilities"""
# Initialize the tutorial system
agent_system = AgentSquadTutorial()
# Test queries for different domains
test_queries = [
"What are the best practices for implementing microservices architecture?",
"How can I improve my cardiovascular health through diet and exercise?",
"What should I consider when creating a business plan for a tech startup?",
"Explain the differences between Docker containers and virtual machines",
"What are some effective stress management techniques for busy professionals?"
]
print("🚀 **Agent Squad Tutorial Demo**\n")
print("=" * 50)
for i, query in enumerate(test_queries, 1):
print(f"\n**Test Query {i}:**")
print("-" * 30)
await agent_system.process_query(query)
print("=" * 50)
if __name__ == "__main__":
asyncio.run(main())
TypeScript Implementation
Here’s the equivalent TypeScript implementation:
import { AgentSquad } from '@awslabs/agent-squad';
import { BedrockLLMAgent, BedrockLLMAgentOptions } from '@awslabs/agent-squad';
class AgentSquadTutorial {
private orchestrator: AgentSquad;
constructor() {
this.orchestrator = new AgentSquad();
this.setupAgents();
}
private setupAgents(): void {
// Technology expert agent
const techAgent = new BedrockLLMAgent({
name: 'Technology Expert',
streaming: true,
description: `Expert in software development, cloud computing, AI/ML,
cybersecurity, blockchain, and emerging technologies.`,
modelId: 'anthropic.claude-3-sonnet-20240229-v1:0',
} as BedrockLLMAgentOptions);
// Health and wellness agent
const healthAgent = new BedrockLLMAgent({
name: 'Health & Wellness Expert',
streaming: true,
description: `Specializes in health, wellness, nutrition, fitness,
mental health, and medical information.`,
modelId: 'anthropic.claude-3-sonnet-20240229-v1:0',
} as BedrockLLMAgentOptions);
// Add agents to orchestrator
this.orchestrator.addAgent(techAgent);
this.orchestrator.addAgent(healthAgent);
}
async processQuery(
userInput: string,
userId: string = 'user123',
sessionId: string = 'session456'
): Promise<void> {
try {
const response = await this.orchestrator.routeRequest(
userInput,
userId,
sessionId,
{},
true
);
await this.handleResponse(response);
} catch (error) {
console.error('Error processing query:', error);
}
}
private async handleResponse(response: any): Promise<void> {
if (response.streaming) {
console.log('\n🤖 **STREAMING RESPONSE**\n');
// Display metadata
this.printMetadata(response.metadata);
console.log('\n📝 **Response:**');
// Handle streaming response
for await (const chunk of response.output) {
if (chunk.text) {
process.stdout.write(chunk.text);
}
}
console.log('\n');
} else {
console.log('\n🤖 **RESPONSE**\n');
this.printMetadata(response.metadata);
console.log(`\n📝 **Response:** ${response.output.content}`);
}
}
private printMetadata(metadata: any): void {
console.log(`🎯 **Agent:** ${metadata.agentName} (ID: ${metadata.agentId})`);
console.log(`👤 **User:** ${metadata.userId}`);
console.log(`🔗 **Session:** ${metadata.sessionId}`);
console.log(`❓ **Query:** ${metadata.userInput}`);
}
}
// Example usage
async function main() {
const agentSystem = new AgentSquadTutorial();
const testQueries = [
"What are the latest trends in cloud computing?",
"How can I maintain good mental health while working remotely?"
];
console.log('🚀 **Agent Squad Tutorial Demo (TypeScript)**\n');
for (const query of testQueries) {
await agentSystem.processQuery(query);
console.log('='.repeat(50));
}
}
main().catch(console.error);
Advanced Configuration
Custom Agent Creation
You can create custom agents by extending the base agent class:
from agent_squad.agents import Agent, AgentOptions
from typing import Optional, Dict, Any
class CustomDatabaseAgent(Agent):
def __init__(self, options: AgentOptions):
super().__init__(options)
# Initialize database connections, tools, etc.
async def process_request(
self,
input_text: str,
user_id: str,
session_id: str,
chat_history: list,
additional_params: Optional[Dict[str, Any]] = None
):
# Custom processing logic
# Query databases, perform calculations, etc.
# Return structured response
return {
"content": "Database query results...",
"metadata": {
"query_time": "0.5s",
"records_found": 42
}
}
Advanced Orchestrator Configuration
from agent_squad.orchestrator import AgentSquad
from agent_squad.classifiers import BedrockClassifier, BedrockClassifierOptions
# Create orchestrator with custom classifier
classifier = BedrockClassifier(BedrockClassifierOptions(
model_id="anthropic.claude-3-haiku-20240307-v1:0",
inference_config={
"maxTokens": 1000,
"temperature": 0.1
}
))
orchestrator = AgentSquad(
classifier=classifier,
logger=custom_logger,
config={
"LOG_AGENT_CHAT": True,
"LOG_CLASSIFIER_CHAT": True,
"LOG_CLASSIFIER_RAW_OUTPUT": True,
"LOG_CLASSIFIER_OUTPUT": True,
"LOG_EXECUTION_TIMES": True,
"MAX_RETRIES": 3,
"USE_DEFAULT_AGENT_IF_NONE_IDENTIFIED": True,
"MAX_TOKENS": 1000,
"TEMPERATURE": 0.1
}
)
Real-World Use Cases and Examples
Customer Service Automation
async def setup_customer_service_agents():
"""Setup specialized customer service agents"""
orchestrator = AgentSquad()
# Technical support agent
tech_support = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Technical Support",
description="Handles technical issues, troubleshooting, and product support",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
# Billing and account agent
billing_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Billing Support",
description="Handles billing inquiries, account management, and payment issues",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
# General information agent
info_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Information Agent",
description="Provides general company information, policies, and basic inquiries",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
orchestrator.add_agent(tech_support)
orchestrator.add_agent(billing_agent)
orchestrator.add_agent(info_agent)
return orchestrator
Educational Platform
async def setup_educational_agents():
"""Setup agents for different academic subjects"""
orchestrator = AgentSquad()
subjects = [
("Mathematics", "Expert in mathematics, calculus, statistics, and problem-solving"),
("Science", "Specializes in physics, chemistry, biology, and scientific concepts"),
("Literature", "Expert in literature analysis, writing, and language arts"),
("History", "Specializes in world history, historical analysis, and social studies")
]
for name, description in subjects:
agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name=f"{name} Tutor",
description=description,
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
streaming=True
))
orchestrator.add_agent(agent)
return orchestrator
Performance Optimization
Connection Pooling and Caching
from agent_squad.orchestrator import AgentSquad
import asyncio
from functools import lru_cache
class OptimizedAgentSquad:
def __init__(self):
self.orchestrator = AgentSquad()
self._connection_pool = self._setup_connection_pool()
self._setup_caching()
def _setup_connection_pool(self):
"""Setup connection pooling for better performance"""
# Configure connection pools for different services
return {
'bedrock': self._create_bedrock_pool(),
'openai': self._create_openai_pool(),
}
@lru_cache(maxsize=1000)
def _cached_classification(self, query_hash: str):
"""Cache classification results for similar queries"""
# Implementation for caching classification results
pass
async def batch_process_queries(self, queries: list):
"""Process multiple queries concurrently"""
tasks = [
self.orchestrator.route_request(query, f"user_{i}", f"session_{i}")
for i, query in enumerate(queries)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Monitoring and Logging
import logging
import time
from functools import wraps
class AgentSquadMonitor:
def __init__(self, orchestrator):
self.orchestrator = orchestrator
self.logger = logging.getLogger('agent_squad_monitor')
self._setup_monitoring()
def _setup_monitoring(self):
"""Setup comprehensive monitoring"""
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'average_response_time': 0,
'agent_usage': {}
}
def monitor_request(self, func):
"""Decorator to monitor request performance"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
self.metrics['total_requests'] += 1
try:
result = await func(*args, **kwargs)
self.metrics['successful_requests'] += 1
# Track agent usage
agent_name = result.metadata.agent_name
self.metrics['agent_usage'][agent_name] = \
self.metrics['agent_usage'].get(agent_name, 0) + 1
return result
except Exception as e:
self.metrics['failed_requests'] += 1
self.logger.error(f"Request failed: {e}")
raise
finally:
# Update average response time
response_time = time.time() - start_time
self._update_average_response_time(response_time)
return wrapper
def _update_average_response_time(self, response_time):
"""Update running average of response times"""
current_avg = self.metrics['average_response_time']
total_requests = self.metrics['total_requests']
self.metrics['average_response_time'] = \
(current_avg * (total_requests - 1) + response_time) / total_requests
def get_performance_report(self):
"""Generate performance report"""
return {
'summary': self.metrics,
'success_rate': self.metrics['successful_requests'] / self.metrics['total_requests'] * 100,
'most_used_agent': max(self.metrics['agent_usage'],
key=self.metrics['agent_usage'].get) if self.metrics['agent_usage'] else None
}
Deployment Strategies
AWS Lambda Deployment
import json
import asyncio
from agent_squad.orchestrator import AgentSquad
from agent_squad.agents import BedrockLLMAgent, BedrockLLMAgentOptions
# Global orchestrator instance for Lambda container reuse
orchestrator = None
def lambda_handler(event, context):
"""AWS Lambda handler for Agent Squad"""
global orchestrator
# Initialize orchestrator on cold start
if orchestrator is None:
orchestrator = setup_orchestrator()
# Extract request data
body = json.loads(event['body'])
user_input = body['message']
user_id = body.get('user_id', 'anonymous')
session_id = body.get('session_id', 'default')
# Process request
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response = loop.run_until_complete(
orchestrator.route_request(user_input, user_id, session_id)
)
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({
'response': response.output.content,
'agent': response.metadata.agent_name,
'success': True
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': str(e),
'success': False
})
}
finally:
loop.close()
def setup_orchestrator():
"""Setup orchestrator with production configuration"""
squad = AgentSquad()
# Add production agents
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Production Tech Agent",
description="Production-ready technical support agent",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
))
squad.add_agent(tech_agent)
return squad
Docker Deployment
# Dockerfile for Agent Squad application
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Set environment variables
ENV PYTHONPATH=/app
ENV AWS_DEFAULT_REGION=us-east-1
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Best Practices and Tips
1. Agent Design Principles
- Single Responsibility: Each agent should have a clearly defined domain
- Clear Descriptions: Write detailed agent descriptions for better routing
- Performance Optimization: Use appropriate model sizes for different tasks
- Error Handling: Implement robust error handling and fallback mechanisms
2. Context Management
# Effective context management
async def manage_conversation_context(orchestrator, user_id, session_id):
"""Best practices for context management"""
# Store important context information
context = {
'user_preferences': get_user_preferences(user_id),
'conversation_history': get_conversation_history(session_id),
'current_task': 'information_gathering'
}
# Pass context through additional_params
response = await orchestrator.route_request(
user_input="Continue our previous discussion",
user_id=user_id,
session_id=session_id,
additional_params=context
)
return response
3. Security Considerations
# Input validation and sanitization
def validate_input(user_input: str) -> bool:
"""Validate user input for security"""
# Check for malicious content
forbidden_patterns = [
r'<script.*?</script>',
r'javascript:',
r'on\w+\s*='
]
import re
for pattern in forbidden_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
return False
# Check input length
if len(user_input) > 10000:
return False
return True
# Rate limiting implementation
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_requests=100, time_window=3600):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
self.requests[user_id] = [
req_time for req_time in user_requests
if now - req_time < self.time_window
]
# Check if under limit
if len(self.requests[user_id]) < self.max_requests:
self.requests[user_id].append(now)
return True
return False
Troubleshooting Guide
Common Issues and Solutions
- Agent Selection Problems
# Debug agent selection orchestrator.config['LOG_CLASSIFIER_OUTPUT'] = True orchestrator.config['LOG_CLASSIFIER_RAW_OUTPUT'] = True
- Memory Issues with Large Contexts
# Implement context truncation def truncate_context(context, max_length=8000): if len(context) > max_length: return context[-max_length:] return context
- Performance Bottlenecks
# Implement async processing import asyncio async def process_multiple_requests(requests): tasks = [process_single_request(req) for req in requests] return await asyncio.gather(*tasks)
Testing Your Implementation
Create a comprehensive test suite:
import pytest
import asyncio
from agent_squad.orchestrator import AgentSquad
class TestAgentSquad:
@pytest.fixture
async def orchestrator(self):
"""Setup test orchestrator"""
squad = AgentSquad()
# Add test agents
return squad
@pytest.mark.asyncio
async def test_tech_query_routing(self, orchestrator):
"""Test that tech queries route to tech agent"""
response = await orchestrator.route_request(
"How do I deploy a Docker container?",
"test_user",
"test_session"
)
assert "tech" in response.metadata.agent_name.lower()
@pytest.mark.asyncio
async def test_streaming_response(self, orchestrator):
"""Test streaming functionality"""
response = await orchestrator.route_request(
"Explain machine learning",
"test_user",
"test_session",
streaming=True
)
assert response.streaming is True
# Collect streamed content
content = ""
async for chunk in response.output:
content += chunk.text
assert len(content) > 0
Conclusion
Agent Squad represents a powerful evolution in multi-agent AI systems, offering:
- Intelligent routing for better user experiences
- Flexible architecture supporting various AI providers
- Production-ready features for enterprise deployments
- Strong community support and active development
The framework’s dual-language support (Python/TypeScript) and modular design make it an excellent choice for both prototyping and production deployments. Whether you’re building customer service systems, educational platforms, or complex conversational AI applications, Agent Squad provides the foundation for sophisticated multi-agent orchestration.
Next Steps
- Experiment with the basic implementation
- Customize agents for your specific use case
- Implement monitoring and performance optimization
- Deploy to your preferred cloud platform
- Contribute to the open-source community
For more advanced features and enterprise support, explore the official documentation and join the growing community of Agent Squad developers.
This tutorial provides a comprehensive foundation for working with AWS Agent Squad. As the framework continues to evolve, stay updated with the latest features and best practices through the official repository and documentation.