AI Agent 병렬 처리: LangGraph와 CrewAI를 활용한 워크플로우 최적화
⏱️ 예상 읽기 시간: 12분
서론
AI Agent 기술이 발전하면서 단일 에이전트의 한계를 넘어 여러 에이전트가 협력하여 복잡한 작업을 수행하는 멀티에이전트 시스템이 주목받고 있습니다. 특히 병렬 처리(Parallel Processing)를 통해 여러 에이전트가 동시에 작업을 수행함으로써 전체적인 성능과 효율성을 크게 향상시킬 수 있습니다.
이 글에서는 LangGraph와 CrewAI를 중심으로 AI Agent 병렬 처리의 핵심 개념과 실전 구현 방법을 다루겠습니다. 단순한 이론적 설명을 넘어 실제 프로젝트에 적용할 수 있는 구체적인 코드 예제와 성능 최적화 기법까지 포함하여 제공합니다.
병렬 처리가 필요한 이유
기존 순차 처리의 한계
전통적인 AI Agent 시스템은 대부분 순차적으로 작업을 수행합니다:
# 순차 처리 예시
def sequential_workflow():
result1 = agent1.process(task1) # 10초
result2 = agent2.process(task2) # 15초
result3 = agent3.process(task3) # 12초
# 총 소요 시간: 37초
이 방식의 문제점:
- 시간 낭비: 각 에이전트가 다른 에이전트의 완료를 기다림
- 리소스 비효율성: CPU/GPU 자원이 대부분 유휴 상태
- 확장성 부족: 에이전트 수가 늘어날수록 지연 시간 급증
병렬 처리의 장점
병렬 처리를 통해 다음과 같은 이점을 얻을 수 있습니다:
# 병렬 처리 예시
async def parallel_workflow():
tasks = [
agent1.process(task1), # 10초
agent2.process(task2), # 15초
agent3.process(task3) # 12초
]
results = await asyncio.gather(*tasks)
# 총 소요 시간: 15초 (가장 긴 작업 시간)
주요 장점:
- 시간 단축: 전체 작업 시간을 최대 60-80% 단축
- 리소스 효율성: 모든 에이전트가 동시에 활성화
- 확장성: 에이전트 수 증가에 따른 선형적 성능 향상
- 내결함성: 일부 에이전트 실패 시에도 다른 작업 계속 진행
LangGraph를 활용한 병렬 처리
LangGraph 기본 개념
LangGraph는 LangChain 생태계의 일부로, 복잡한 AI 워크플로우를 그래프 형태로 구성할 수 있게 해주는 프레임워크입니다.
import os
from typing import Dict, List
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
import asyncio
load_dotenv()
# 상태 정의
class AgentState:
def __init__(self, tasks: List[str], results: Dict = None):
self.tasks = tasks
self.results = results or {}
# LLM 모델 초기화
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
api_key=os.getenv("OPENAI_API_KEY")
)
기본 병렬 워크플로우 구현
# 병렬 처리할 에이전트 함수들
def research_agent(state: AgentState) -> AgentState:
"""연구 및 분석 에이전트"""
task = state.tasks[0] if state.tasks else "기본 연구"
prompt = f"""
다음 주제에 대해 깊이 있는 연구를 수행하세요:
{task}
다음 형식으로 응답하세요:
- 핵심 개념
- 주요 특징
- 활용 사례
- 최신 동향
"""
response = llm.invoke(prompt)
state.results["research"] = response.content
return state
def analysis_agent(state: AgentState) -> AgentState:
"""데이터 분석 에이전트"""
task = state.tasks[1] if len(state.tasks) > 1 else "기본 분석"
prompt = f"""
다음 데이터를 분석하고 인사이트를 도출하세요:
{task}
다음 형식으로 응답하세요:
- 데이터 패턴
- 주요 트렌드
- 예측 모델
- 권장사항
"""
response = llm.invoke(prompt)
state.results["analysis"] = response.content
return state
def summary_agent(state: AgentState) -> AgentState:
"""요약 및 통합 에이전트"""
research = state.results.get("research", "")
analysis = state.results.get("analysis", "")
prompt = f"""
다음 연구 결과와 분석을 종합하여 최종 보고서를 작성하세요:
연구 결과:
{research}
분석 결과:
{analysis}
다음 형식으로 응답하세요:
# 최종 보고서
## 핵심 요약
## 주요 발견사항
## 결론 및 제언
"""
response = llm.invoke(prompt)
state.results["summary"] = response.content
return state
# 워크플로우 그래프 구성
def create_parallel_workflow():
workflow = StateGraph(AgentState)
# 병렬 노드 추가
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
# 순차 노드 추가
workflow.add_node("summary", summary_agent)
# 엣지 설정
workflow.add_edge("research", "summary")
workflow.add_edge("analysis", "summary")
workflow.add_edge("summary", END)
return workflow.compile()
# 실행 함수
async def run_parallel_workflow(tasks: List[str]):
workflow = create_parallel_workflow()
initial_state = AgentState(tasks=tasks)
result = await workflow.ainvoke(initial_state)
return result.results
# 테스트 실행
if __name__ == "__main__":
tasks = [
"AI Agent 병렬 처리 기술 동향",
"성능 최적화 데이터 분석"
]
results = asyncio.run(run_parallel_workflow(tasks))
print("=== 병렬 처리 결과 ===")
for key, value in results.items():
print(f"\n{key.upper()}:")
print(value)
고급 병렬 워크플로우
더 복잡한 시나리오를 위한 고급 병렬 워크플로우를 구현해보겠습니다:
from typing import Any, TypedDict
import json
from datetime import datetime
class AdvancedState(TypedDict):
tasks: List[str]
results: Dict[str, Any]
metadata: Dict[str, Any]
errors: List[str]
def data_collection_agent(state: AdvancedState) -> AdvancedState:
"""데이터 수집 에이전트"""
try:
task = state["tasks"][0]
prompt = f"""
다음 주제에 대한 데이터를 수집하세요:
{task}
다음 형식으로 응답하세요:
- 데이터 소스
- 주요 데이터
- 수집 시간
"""
response = llm.invoke(prompt)
data = json.loads(response.content)
state["results"]["data_collection"] = data
state["metadata"]["data_collection_time"] = datetime.now().isoformat()
except Exception as e:
state["errors"].append(f"Data collection error: {str(e)}")
return state
def market_analysis_agent(state: AdvancedState) -> AdvancedState:
"""시장 분석 에이전트"""
try:
task = state["tasks"][1] if len(state["tasks"]) > 1 else "시장 분석"
prompt = f"""
다음 시장을 분석하세요:
{task}
JSON 형식으로 응답하세요:
{{
"market_size": "시장 규모",
"growth_rate": "성장률",
"key_players": ["주요 기업1", "주요 기업2"],
"trends": ["트렌드1", "트렌드2"]
}}
"""
response = llm.invoke(prompt)
data = json.loads(response.content)
state["results"]["market_analysis"] = data
state["metadata"]["market_analysis_time"] = datetime.now().isoformat()
except Exception as e:
state["errors"].append(f"Market analysis error: {str(e)}")
return state
def technical_analysis_agent(state: AdvancedState) -> AdvancedState:
"""기술 분석 에이전트"""
try:
task = state["tasks"][2] if len(state["tasks"]) > 2 else "기술 분석"
prompt = f"""
다음 기술을 분석하세요:
{task}
JSON 형식으로 응답하세요:
{{
"technology_stack": ["기술1", "기술2"],
"complexity": "복잡도",
"implementation_time": "구현 시간",
"risks": ["위험요소1", "위험요소2"]
}}
"""
response = llm.invoke(prompt)
data = json.loads(response.content)
state["results"]["technical_analysis"] = data
state["metadata"]["technical_analysis_time"] = datetime.now().isoformat()
except Exception as e:
state["errors"].append(f"Technical analysis error: {str(e)}")
return state
def integration_agent(state: AdvancedState) -> AdvancedState:
"""통합 및 최종 보고서 생성 에이전트"""
try:
data_collection = state["results"].get("data_collection", {})
market_analysis = state["results"].get("market_analysis", {})
technical_analysis = state["results"].get("technical_analysis", {})
prompt = f"""
다음 세 가지 분석 결과를 통합하여 종합 보고서를 작성하세요:
데이터 수집 결과:
{json.dumps(data_collection, ensure_ascii=False, indent=2)}
시장 분석 결과:
{json.dumps(market_analysis, ensure_ascii=False, indent=2)}
기술 분석 결과:
{json.dumps(technical_analysis, ensure_ascii=False, indent=2)}
다음 형식으로 응답하세요:
# 종합 분석 보고서
## 실행 요약
## 핵심 발견사항
## 시장 기회
## 기술적 고려사항
## 권장사항
## 위험요소
"""
response = llm.invoke(prompt)
state["results"]["final_report"] = response.content
state["metadata"]["final_report_time"] = datetime.now().isoformat()
except Exception as e:
state["errors"].append(f"Integration error: {str(e)}")
return state
def create_advanced_workflow():
workflow = StateGraph(AdvancedState)
# 병렬 노드들
workflow.add_node("data_collection", data_collection_agent)
workflow.add_node("market_analysis", market_analysis_agent)
workflow.add_node("technical_analysis", technical_analysis_agent)
# 통합 노드
workflow.add_node("integration", integration_agent)
# 엣지 설정 - 병렬 처리
workflow.add_edge("data_collection", "integration")
workflow.add_edge("market_analysis", "integration")
workflow.add_edge("technical_analysis", "integration")
workflow.add_edge("integration", END)
return workflow.compile()
CrewAI를 활용한 멀티에이전트 시스템
CrewAI 기본 개념
CrewAI는 여러 AI Agent가 협력하여 복잡한 작업을 수행할 수 있도록 설계된 프레임워크입니다.
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
import os
# LLM 설정
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
api_key=os.getenv("OPENAI_API_KEY")
)
# 에이전트 정의
researcher = Agent(
role='Research Analyst',
goal='Conduct thorough research on given topics',
backstory="""You are an expert research analyst with years of experience
in gathering and analyzing information from various sources.""",
verbose=True,
allow_delegation=False,
llm=llm
)
writer = Agent(
role='Technical Writer',
goal='Write clear and comprehensive technical content',
backstory="""You are a skilled technical writer who excels at creating
clear, engaging, and informative content for technical audiences.""",
verbose=True,
allow_delegation=False,
llm=llm
)
reviewer = Agent(
role='Quality Assurance Specialist',
goal='Review and improve content quality',
backstory="""You are a meticulous quality assurance specialist who ensures
all content meets high standards of accuracy and clarity.""",
verbose=True,
allow_delegation=False,
llm=llm
)
기본 CrewAI 워크플로우
# 태스크 정의
research_task = Task(
description="""
Conduct comprehensive research on AI Agent parallel processing:
1. Current state of the technology
2. Key frameworks and tools
3. Performance benefits
4. Implementation challenges
5. Best practices
Provide detailed findings with specific examples and data.
""",
agent=researcher,
expected_output="Detailed research report with findings and insights"
)
writing_task = Task(
description="""
Based on the research findings, create a comprehensive blog post about
AI Agent parallel processing. The post should:
1. Be engaging and informative
2. Include practical examples
3. Provide actionable insights
4. Use clear, technical language
5. Be structured with proper headings and sections
Target audience: Technical professionals and developers
""",
agent=writer,
expected_output="Complete blog post with proper structure and content"
)
review_task = Task(
description="""
Review the blog post for:
1. Technical accuracy
2. Clarity and readability
3. Logical flow and structure
4. Grammar and style
5. Completeness of information
Provide specific feedback and suggestions for improvement.
""",
agent=reviewer,
expected_output="Detailed review with feedback and improvement suggestions"
)
# Crew 생성 및 실행
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, writing_task, review_task],
process=Process.sequential, # 순차 처리
verbose=True
)
result = crew.kickoff()
print("=== CrewAI 결과 ===")
print(result)
병렬 처리를 위한 CrewAI 최적화
CrewAI에서 병렬 처리를 구현하는 방법:
from crewai import Process
import asyncio
from typing import List
# 병렬 처리가 가능한 독립적인 태스크들
parallel_tasks = [
Task(
description="Research current AI Agent frameworks and their capabilities",
agent=researcher,
expected_output="Framework analysis report"
),
Task(
description="Analyze performance benchmarks for parallel processing",
agent=researcher,
expected_output="Performance analysis report"
),
Task(
description="Gather case studies of successful parallel processing implementations",
agent=researcher,
expected_output="Case studies report"
)
]
# 병렬 Crew 생성
parallel_crew = Crew(
agents=[researcher],
tasks=parallel_tasks,
process=Process.hierarchical, # 계층적 처리로 병렬 실행
verbose=True
)
# 병렬 실행
parallel_result = parallel_crew.kickoff()
# 결과 통합
integration_task = Task(
description=f"""
Integrate the following parallel research results into a comprehensive report:
{parallel_result}
Create a unified document that:
1. Synthesizes all findings
2. Identifies common patterns
3. Provides actionable recommendations
4. Highlights key insights
""",
agent=writer,
expected_output="Integrated comprehensive report"
)
final_crew = Crew(
agents=[writer],
tasks=[integration_task],
process=Process.sequential,
verbose=True
)
final_result = final_crew.kickoff()
성능 최적화 기법
1. 비동기 처리 최적화
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import time
class OptimizedAgent:
def __init__(self, name: str, llm: ChatOpenAI):
self.name = name
self.llm = llm
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def process_task(self, task: str) -> str:
"""비동기 태스크 처리"""
start_time = time.time()
try:
response = await self.llm.ainvoke(task)
processing_time = time.time() - start_time
return {
"agent": self.name,
"result": response.content,
"processing_time": processing_time,
"status": "success"
}
except Exception as e:
return {
"agent": self.name,
"error": str(e),
"processing_time": time.time() - start_time,
"status": "error"
}
async def optimized_parallel_workflow(tasks: List[str]):
"""최적화된 병렬 워크플로우"""
agents = [
OptimizedAgent("Research", llm),
OptimizedAgent("Analysis", llm),
OptimizedAgent("Summary", llm)
]
async with asyncio.TaskGroup() as tg:
agent_tasks = []
for i, agent in enumerate(agents):
if i < len(tasks):
task = tg.create_task(agent.process_task(tasks[i]))
agent_tasks.append(task)
results = [task.result() for task in agent_tasks]
return results
# 실행
async def main():
tasks = [
"AI Agent 병렬 처리 기술 동향 분석",
"성능 최적화 기법 연구",
"실제 프로젝트 적용 사례 조사"
]
results = await optimized_parallel_workflow(tasks)
print("=== 최적화된 병렬 처리 결과 ===")
for result in results:
print(f"\n{result['agent']} Agent:")
print(f"상태: {result['status']}")
print(f"처리 시간: {result['processing_time']:.2f}초")
if result['status'] == 'success':
print(f"결과: {result['result'][:100]}...")
else:
print(f"오류: {result['error']}")
# 실행
if __name__ == "__main__":
asyncio.run(main())
2. 메모리 관리 최적화
import gc
import psutil
import threading
from typing import Dict, Any
class MemoryOptimizedAgent:
def __init__(self, name: str, llm: ChatOpenAI, max_memory_mb: int = 512):
self.name = name
self.llm = llm
self.max_memory_mb = max_memory_mb
self.memory_lock = threading.Lock()
def check_memory_usage(self) -> bool:
"""메모리 사용량 확인"""
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
return memory_mb < self.max_memory_mb
def cleanup_memory(self):
"""메모리 정리"""
with self.memory_lock:
gc.collect()
async def process_with_memory_management(self, task: str) -> Dict[str, Any]:
"""메모리 관리와 함께 태스크 처리"""
if not self.check_memory_usage():
self.cleanup_memory()
try:
response = await self.llm.ainvoke(task)
# 메모리 사용량 모니터링
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024
return {
"agent": self.name,
"result": response.content,
"memory_usage_mb": memory_usage,
"status": "success"
}
except Exception as e:
return {
"agent": self.name,
"error": str(e),
"status": "error"
}
finally:
self.cleanup_memory()
class WorkflowManager:
def __init__(self, max_concurrent_agents: int = 3):
self.max_concurrent_agents = max_concurrent_agents
self.semaphore = asyncio.Semaphore(max_concurrent_agents)
async def process_with_limitation(self, agent: MemoryOptimizedAgent, task: str):
"""동시 실행 제한과 함께 처리"""
async with self.semaphore:
return await agent.process_with_memory_management(task)
async def run_optimized_workflow(self, agents: List[MemoryOptimizedAgent], tasks: List[str]):
"""최적화된 워크플로우 실행"""
workflow_tasks = []
for i, agent in enumerate(agents):
if i < len(tasks):
task = self.process_with_limitation(agent, tasks[i])
workflow_tasks.append(task)
results = await asyncio.gather(*workflow_tasks, return_exceptions=True)
return results
3. 에러 처리 및 복구
import logging
from typing import Optional, Callable
from functools import wraps
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RetryMechanism:
def __init__(self, max_retries: int = 3, delay: float = 1.0):
self.max_retries = max_retries
self.delay = delay
async def retry_async(self, func: Callable, *args, **kwargs):
"""비동기 함수 재시도 메커니즘"""
last_exception = None
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < self.max_retries - 1:
await asyncio.sleep(self.delay * (2 ** attempt)) # 지수 백오프
logger.error(f"All {self.max_retries} attempts failed")
raise last_exception
class FaultTolerantAgent:
def __init__(self, name: str, llm: ChatOpenAI, retry_mechanism: RetryMechanism):
self.name = name
self.llm = llm
self.retry_mechanism = retry_mechanism
self.failure_count = 0
self.success_count = 0
async def process_with_fault_tolerance(self, task: str) -> Dict[str, Any]:
"""내결함성을 갖춘 태스크 처리"""
start_time = time.time()
try:
result = await self.retry_mechanism.retry_async(
self.llm.ainvoke, task
)
self.success_count += 1
processing_time = time.time() - start_time
return {
"agent": self.name,
"result": result.content,
"processing_time": processing_time,
"status": "success",
"attempts": 1
}
except Exception as e:
self.failure_count += 1
processing_time = time.time() - start_time
logger.error(f"Agent {self.name} failed: {str(e)}")
return {
"agent": self.name,
"error": str(e),
"processing_time": processing_time,
"status": "error",
"attempts": self.retry_mechanism.max_retries
}
def get_health_metrics(self) -> Dict[str, Any]:
"""에이전트 상태 메트릭"""
total_attempts = self.success_count + self.failure_count
success_rate = (self.success_count / total_attempts * 100) if total_attempts > 0 else 0
return {
"agent": self.name,
"success_count": self.success_count,
"failure_count": self.failure_count,
"success_rate": success_rate,
"total_attempts": total_attempts
}
class WorkflowMonitor:
def __init__(self):
self.agents: List[FaultTolerantAgent] = []
self.workflow_start_time: Optional[float] = None
self.workflow_end_time: Optional[float] = None
def add_agent(self, agent: FaultTolerantAgent):
"""에이전트 추가"""
self.agents.append(agent)
def start_workflow(self):
"""워크플로우 시작"""
self.workflow_start_time = time.time()
logger.info("Workflow started")
def end_workflow(self):
"""워크플로우 종료"""
self.workflow_end_time = time.time()
logger.info("Workflow completed")
def get_workflow_metrics(self) -> Dict[str, Any]:
"""워크플로우 메트릭"""
if not self.workflow_start_time:
return {"error": "Workflow not started"}
duration = (self.workflow_end_time or time.time()) - self.workflow_start_time
agent_metrics = [agent.get_health_metrics() for agent in self.agents]
total_success = sum(metric["success_count"] for metric in agent_metrics)
total_failures = sum(metric["failure_count"] for metric in agent_metrics)
overall_success_rate = (total_success / (total_success + total_failures) * 100) if (total_success + total_failures) > 0 else 0
return {
"workflow_duration": duration,
"total_agents": len(self.agents),
"total_success": total_success,
"total_failures": total_failures,
"overall_success_rate": overall_success_rate,
"agent_metrics": agent_metrics
}
실제 프로젝트 적용 사례
프로젝트: AI 기반 콘텐츠 생성 시스템
# 실제 프로젝트 구조
project_structure = """
ai_content_system/
├── agents/
│ ├── research_agent.py
│ ├── writer_agent.py
│ ├── editor_agent.py
│ └── publisher_agent.py
├── workflows/
│ ├── parallel_workflow.py
│ └── sequential_workflow.py
├── utils/
│ ├── memory_manager.py
│ ├── error_handler.py
│ └── performance_monitor.py
├── config/
│ └── settings.py
└── main.py
"""
# main.py - 실제 프로젝트 메인 파일
import asyncio
from typing import List, Dict, Any
from agents import ResearchAgent, WriterAgent, EditorAgent, PublisherAgent
from workflows import ParallelWorkflow, SequentialWorkflow
from utils import MemoryManager, ErrorHandler, PerformanceMonitor
from config.settings import Settings
class AIContentSystem:
def __init__(self, config: Settings):
self.config = config
self.memory_manager = MemoryManager()
self.error_handler = ErrorHandler()
self.performance_monitor = PerformanceMonitor()
# 에이전트 초기화
self.agents = {
"research": ResearchAgent(config),
"writer": WriterAgent(config),
"editor": EditorAgent(config),
"publisher": PublisherAgent(config)
}
# 워크플로우 초기화
self.parallel_workflow = ParallelWorkflow(self.agents, config)
self.sequential_workflow = SequentialWorkflow(self.agents, config)
async def generate_content(self, topic: str, content_type: str = "blog") -> Dict[str, Any]:
"""콘텐츠 생성 메인 함수"""
self.performance_monitor.start_workflow()
try:
# 병렬 워크플로우 실행
if content_type == "comprehensive":
result = await self.parallel_workflow.execute(topic)
else:
result = await self.sequential_workflow.execute(topic)
# 성능 메트릭 수집
metrics = self.performance_monitor.get_metrics()
return {
"content": result,
"metrics": metrics,
"status": "success"
}
except Exception as e:
error_info = self.error_handler.handle_error(e)
return {
"error": error_info,
"status": "error"
}
finally:
self.performance_monitor.end_workflow()
self.memory_manager.cleanup()
async def batch_generate(self, topics: List[str]) -> List[Dict[str, Any]]:
"""배치 콘텐츠 생성"""
tasks = [self.generate_content(topic) for topic in topics]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
result if not isinstance(result, Exception)
else {"error": str(result), "status": "error"}
for result in results
]
# 사용 예시
async def main():
config = Settings(
max_concurrent_agents=4,
max_memory_mb=1024,
retry_attempts=3,
timeout_seconds=30
)
content_system = AIContentSystem(config)
# 단일 콘텐츠 생성
result = await content_system.generate_content(
"AI Agent 병렬 처리 최적화 기법",
"comprehensive"
)
print("=== 콘텐츠 생성 결과 ===")
print(f"상태: {result['status']}")
if result['status'] == 'success':
print(f"콘텐츠 길이: {len(result['content'])} 문자")
print(f"처리 시간: {result['metrics']['total_duration']:.2f}초")
else:
print(f"오류: {result['error']}")
# 배치 생성
topics = [
"머신러닝 모델 최적화",
"딥러닝 프레임워크 비교",
"AI 윤리와 가이드라인"
]
batch_results = await content_system.batch_generate(topics)
print("\n=== 배치 생성 결과 ===")
for i, result in enumerate(batch_results):
print(f"토픽 {i+1}: {result['status']}")
if __name__ == "__main__":
asyncio.run(main())
성능 벤치마크 및 비교
벤치마크 결과
다양한 설정에서의 성능 비교:
import time
import statistics
from typing import List, Dict
class PerformanceBenchmark:
def __init__(self):
self.results = {}
async def benchmark_sequential(self, tasks: List[str]) -> Dict[str, float]:
"""순차 처리 벤치마크"""
start_time = time.time()
for task in tasks:
await self.simulate_task(task)
duration = time.time() - start_time
return {"duration": duration, "type": "sequential"}
async def benchmark_parallel(self, tasks: List[str]) -> Dict[str, float]:
"""병렬 처리 벤치마크"""
start_time = time.time()
task_coroutines = [self.simulate_task(task) for task in tasks]
await asyncio.gather(*task_coroutines)
duration = time.time() - start_time
return {"duration": duration, "type": "parallel"}
async def simulate_task(self, task: str):
"""태스크 시뮬레이션"""
await asyncio.sleep(2) # 2초 시뮬레이션
def run_comprehensive_benchmark(self, task_counts: List[int] = [1, 2, 4, 8, 16]):
"""종합 벤치마크 실행"""
results = {}
for task_count in task_counts:
tasks = [f"Task_{i}" for i in range(task_count)]
# 순차 처리
sequential_result = asyncio.run(self.benchmark_sequential(tasks))
# 병렬 처리
parallel_result = asyncio.run(self.benchmark_parallel(tasks))
# 성능 향상률 계산
improvement = ((sequential_result["duration"] - parallel_result["duration"])
/ sequential_result["duration"] * 100)
results[task_count] = {
"sequential": sequential_result["duration"],
"parallel": parallel_result["duration"],
"improvement_percent": improvement
}
return results
# 벤치마크 실행
benchmark = PerformanceBenchmark()
results = benchmark.run_comprehensive_benchmark()
print("=== 성능 벤치마크 결과 ===")
print("태스크 수 | 순차(초) | 병렬(초) | 향상률(%)")
print("-" * 40)
for task_count, result in results.items():
print(f"{task_count:8d} | {result['sequential']:8.2f} | {result['parallel']:7.2f} | {result['improvement_percent']:8.1f}")
예상 결과
=== 성능 벤치마크 결과 ===
태스크 수 | 순차(초) | 병렬(초) | 향상률(%)
----------------------------------------
1 | 2.00 | 2.00 | 0.0
2 | 4.00 | 2.00 | 50.0
4 | 8.00 | 2.00 | 75.0
8 | 16.00 | 2.00 | 87.5
16 | 32.00 | 2.00 | 93.8
결론 및 권장사항
핵심 요약
- 병렬 처리의 중요성: AI Agent 시스템에서 병렬 처리는 성능 향상의 핵심 요소
- 프레임워크 선택: LangGraph와 CrewAI는 각각의 장단점이 있으므로 프로젝트 요구사항에 맞게 선택
- 최적화 기법: 비동기 처리, 메모리 관리, 에러 처리를 통한 안정적인 시스템 구축
- 실제 적용: 이론적 개념을 넘어 실제 프로젝트에 적용 가능한 구체적인 구현 방법
권장사항
- 단계적 도입: 작은 규모부터 시작하여 점진적으로 확장
- 모니터링 강화: 성능 메트릭과 에러 로깅을 통한 지속적인 개선
- 리소스 관리: 메모리와 CPU 사용량을 고려한 적절한 동시성 제어
- 테스트 전략: 단위 테스트와 통합 테스트를 통한 안정성 확보
향후 발전 방향
- 자동화된 최적화: AI를 활용한 자동 워크플로우 최적화
- 분산 처리: 여러 서버에 걸친 분산 병렬 처리 시스템
- 실시간 학습: 에이전트 간 협력 패턴의 실시간 학습 및 개선
- 표준화: 병렬 처리 워크플로우의 표준화 및 재사용성 향상
AI Agent 병렬 처리는 복잡한 AI 시스템을 효율적으로 구축하는 핵심 기술입니다. 이 글에서 제시한 방법론과 실제 구현 사례를 참고하여 여러분의 프로젝트에 적용해보시기 바랍니다.