AI Engineering Hub Agentic RAG 소스코드 완전 분석: 에이전트 기반 검색 증강 생성 시스템
개요
AI Engineering Hub는 10.7k 스타를 기록한 최고의 AI 엔지니어링 학습 리포지토리입니다. 이 중에서도 Agentic RAG 프로젝트는 에이전트 기반 검색 증강 생성 시스템의 실전 구현을 보여주는 핵심 프로젝트입니다.
본 포스트에서는 AI Engineering Hub의 Agentic RAG 프로젝트를 소스코드 레벨에서 상세히 분석하고, AgentOps 관점에서 실전 활용 방법을 제시합니다.
Agentic RAG란?
전통적 RAG의 한계:
- 단일 검색-생성 파이프라인의 경직성
- 복잡한 멀티홉 추론 부족
- 동적 적응성 부재
- 컨텍스트 유지의 어려움
Agentic RAG의 해결책:
- 다중 에이전트 협업: 전문화된 에이전트들의 역할 분담
- 동적 워크플로우: 쿼리에 따른 적응적 프로세스
- 도구 사용: 외부 API 및 데이터베이스 활용
- 메모리 관리: 컨텍스트 유지 및 학습
AI Engineering Hub Agentic RAG 프로젝트 구조
1. 프로젝트 아키텍처 분석
# AI Engineering Hub Agentic RAG 핵심 아키텍처
class AgenticRAGSystem:
def __init__(self):
# 에이전트 초기화
self.router_agent = RouterAgent()
self.retriever_agent = RetrieverAgent()
self.synthesizer_agent = SynthesizerAgent()
self.evaluator_agent = EvaluatorAgent()
# 도구 초기화
self.vector_store = VectorStore()
self.web_search = WebSearchTool()
self.memory = ConversationMemory()
async def process_query(self, query: str) -> str:
"""Agentic RAG 메인 프로세스"""
# 1. 라우팅 단계
route_decision = await self.router_agent.route(query)
# 2. 검색 단계
if route_decision == "vector_search":
context = await self.retriever_agent.vector_search(query)
elif route_decision == "web_search":
context = await self.retriever_agent.web_search(query)
else:
context = await self.retriever_agent.hybrid_search(query)
# 3. 합성 단계
response = await self.synthesizer_agent.synthesize(query, context)
# 4. 평가 단계
evaluation = await self.evaluator_agent.evaluate(query, response, context)
# 5. 메모리 업데이트
await self.memory.update(query, response, evaluation)
return response
2. 라우터 에이전트 구현
# 라우터 에이전트 - 쿼리 분류 및 라우팅
class RouterAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.classifier = QueryClassifier()
async def route(self, query: str) -> str:
"""쿼리를 적절한 검색 전략으로 라우팅"""
# 쿼리 분석
query_analysis = await self._analyze_query(query)
# 라우팅 결정
routing_prompt = f"""
다음 쿼리를 분석하여 최적의 검색 전략을 결정하세요:
쿼리: {query}
분석: {query_analysis}
옵션:
1. vector_search: 문서 기반 검색이 필요한 경우
2. web_search: 최신 정보가 필요한 경우
3. hybrid_search: 복합적 정보가 필요한 경우
결정: (vector_search/web_search/hybrid_search 중 하나만 응답)
"""
response = await self.llm.ainvoke(routing_prompt)
return response.content.strip().lower()
async def _analyze_query(self, query: str) -> dict:
"""쿼리 특성 분석"""
analysis_prompt = f"""
다음 쿼리의 특성을 분석하세요:
쿼리: {query}
분석 항목:
1. 시간성 (최신 정보 필요 여부)
2. 복잡성 (멀티홉 추론 필요 여부)
3. 도메인 (특정 전문 분야 여부)
4. 의도 (정보 검색, 분석, 요약 등)
JSON 형태로 응답하세요.
"""
response = await self.llm.ainvoke(analysis_prompt)
try:
return json.loads(response.content)
except:
return {"error": "분석 실패"}
3. 검색 에이전트 구현
# 검색 에이전트 - 다양한 검색 전략 구현
class RetrieverAgent:
def __init__(self, vector_store, web_search_tool):
self.vector_store = vector_store
self.web_search = web_search_tool
self.llm = ChatOpenAI(temperature=0)
async def vector_search(self, query: str, k: int = 5) -> List[Document]:
"""벡터 검색 수행"""
# 쿼리 확장
expanded_query = await self._expand_query(query)
# 벡터 검색
docs = await self.vector_store.asimilarity_search(
expanded_query, k=k
)
# 검색 결과 후처리
processed_docs = await self._post_process_docs(docs, query)
return processed_docs
async def web_search(self, query: str, max_results: int = 3) -> List[Document]:
"""웹 검색 수행"""
# 검색 쿼리 최적화
optimized_query = await self._optimize_web_query(query)
# 웹 검색 실행
search_results = await self.web_search.arun(optimized_query)
# 결과 문서화
documents = []
for result in search_results[:max_results]:
doc = Document(
page_content=result.get('content', ''),
metadata={
'source': result.get('url', ''),
'title': result.get('title', ''),
'timestamp': datetime.now().isoformat()
}
)
documents.append(doc)
return documents
async def hybrid_search(self, query: str) -> List[Document]:
"""하이브리드 검색 - 벡터 + 웹 검색 결합"""
# 병렬 검색 실행
vector_task = asyncio.create_task(self.vector_search(query, k=3))
web_task = asyncio.create_task(self.web_search(query, max_results=2))
vector_docs, web_docs = await asyncio.gather(vector_task, web_task)
# 결과 융합 및 중복 제거
combined_docs = await self._merge_results(vector_docs, web_docs, query)
return combined_docs
async def _expand_query(self, query: str) -> str:
"""쿼리 확장을 통한 검색 성능 향상"""
expansion_prompt = f"""
다음 쿼리를 의미적으로 확장하여 더 나은 검색 결과를 얻도록 하세요:
원본 쿼리: {query}
확장된 쿼리 (핵심 키워드 포함):
"""
response = await self.llm.ainvoke(expansion_prompt)
return response.content.strip()
async def _post_process_docs(self, docs: List[Document], query: str) -> List[Document]:
"""검색 결과 후처리 및 관련성 필터링"""
processed_docs = []
for doc in docs:
# 관련성 점수 계산
relevance_score = await self._calculate_relevance(doc.page_content, query)
if relevance_score > 0.7: # 임계값 설정
doc.metadata['relevance_score'] = relevance_score
processed_docs.append(doc)
# 관련성 순으로 정렬
processed_docs.sort(
key=lambda x: x.metadata.get('relevance_score', 0),
reverse=True
)
return processed_docs
async def _calculate_relevance(self, content: str, query: str) -> float:
"""문서와 쿼리 간 관련성 점수 계산"""
relevance_prompt = f"""
다음 문서가 쿼리와 얼마나 관련이 있는지 0.0-1.0 사이의 점수로 평가하세요:
쿼리: {query}
문서: {content[:500]}...
점수만 응답하세요 (예: 0.85):
"""
response = await self.llm.ainvoke(relevance_prompt)
try:
return float(response.content.strip())
except:
return 0.5 # 기본값
4. 합성 에이전트 구현
# 합성 에이전트 - 검색 결과를 기반으로 응답 생성
class SynthesizerAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.response_templates = ResponseTemplates()
async def synthesize(self, query: str, context: List[Document]) -> str:
"""검색된 컨텍스트를 기반으로 응답 합성"""
# 컨텍스트 준비
formatted_context = await self._format_context(context)
# 응답 생성 전략 결정
synthesis_strategy = await self._determine_strategy(query, context)
# 전략에 따른 응답 생성
if synthesis_strategy == "direct_answer":
response = await self._generate_direct_answer(query, formatted_context)
elif synthesis_strategy == "comparative_analysis":
response = await self._generate_comparative_analysis(query, formatted_context)
elif synthesis_strategy == "step_by_step":
response = await self._generate_step_by_step(query, formatted_context)
else:
response = await self._generate_comprehensive_response(query, formatted_context)
return response
async def _format_context(self, documents: List[Document]) -> str:
"""문서들을 읽기 쉬운 형태로 포맷팅"""
formatted_sections = []
for i, doc in enumerate(documents, 1):
source = doc.metadata.get('source', '알 수 없는 출처')
relevance = doc.metadata.get('relevance_score', 'N/A')
section = f"""
[참조 {i}] (출처: {source}, 관련성: {relevance})
{doc.page_content}
"""
formatted_sections.append(section)
return "\n".join(formatted_sections)
async def _determine_strategy(self, query: str, context: List[Document]) -> str:
"""응답 생성 전략 결정"""
strategy_prompt = f"""
다음 쿼리와 컨텍스트를 분석하여 최적의 응답 전략을 결정하세요:
쿼리: {query}
컨텍스트 수: {len(context)}
전략 옵션:
1. direct_answer: 직접적인 답변
2. comparative_analysis: 비교 분석
3. step_by_step: 단계별 설명
4. comprehensive_response: 종합적 응답
선택된 전략:
"""
response = await self.llm.ainvoke(strategy_prompt)
return response.content.strip().lower()
async def _generate_direct_answer(self, query: str, context: str) -> str:
"""직접적인 답변 생성"""
prompt = f"""
다음 컨텍스트를 바탕으로 질문에 대한 명확하고 직접적인 답변을 제공하세요:
질문: {query}
컨텍스트:
{context}
답변 가이드라인:
1. 핵심 내용만 포함
2. 명확하고 간결하게
3. 출처 정보 포함
4. 확실하지 않은 내용은 명시
답변:
"""
response = await self.llm.ainvoke(prompt)
return response.content
async def _generate_comparative_analysis(self, query: str, context: str) -> str:
"""비교 분석 응답 생성"""
prompt = f"""
다음 컨텍스트의 정보들을 비교 분석하여 질문에 답변하세요:
질문: {query}
컨텍스트:
{context}
분석 구조:
1. 주요 관점들 식별
2. 유사점과 차이점 분석
3. 종합 결론 제시
4. 각 출처별 신뢰도 평가
비교 분석:
"""
response = await self.llm.ainvoke(prompt)
return response.content
async def _generate_step_by_step(self, query: str, context: str) -> str:
"""단계별 설명 생성"""
prompt = f"""
다음 컨텍스트를 바탕으로 질문에 대한 단계별 설명을 제공하세요:
질문: {query}
컨텍스트:
{context}
설명 구조:
1. 개요
2. 단계별 세부 설명
3. 각 단계의 중요성
4. 실제 적용 방법
5. 주의사항
단계별 설명:
"""
response = await self.llm.ainvoke(prompt)
return response.content
5. 평가 에이전트 구현
# 평가 에이전트 - 응답 품질 평가 및 개선
class EvaluatorAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.metrics = ResponseMetrics()
async def evaluate(self, query: str, response: str, context: List[Document]) -> dict:
"""응답 품질 종합 평가"""
# 다중 평가 메트릭 실행
evaluation_tasks = [
self._evaluate_relevance(query, response),
self._evaluate_accuracy(response, context),
self._evaluate_completeness(query, response),
self._evaluate_clarity(response),
self._evaluate_factuality(response, context)
]
results = await asyncio.gather(*evaluation_tasks)
evaluation = {
'relevance': results[0],
'accuracy': results[1],
'completeness': results[2],
'clarity': results[3],
'factuality': results[4],
'overall_score': sum(results) / len(results),
'timestamp': datetime.now().isoformat()
}
# 개선 제안 생성
if evaluation['overall_score'] < 0.8:
evaluation['improvement_suggestions'] = await self._generate_improvements(
query, response, context, evaluation
)
return evaluation
async def _evaluate_relevance(self, query: str, response: str) -> float:
"""응답의 질문 관련성 평가"""
prompt = f"""
질문과 응답의 관련성을 0.0-1.0 사이의 점수로 평가하세요:
질문: {query}
응답: {response}
평가 기준:
- 질문의 핵심 의도 파악 정도
- 응답의 직접적 연관성
- 불필요한 정보 포함 여부
점수 (0.0-1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _evaluate_accuracy(self, response: str, context: List[Document]) -> float:
"""응답의 정확성 평가"""
context_text = "\n".join([doc.page_content for doc in context])
prompt = f"""
응답이 제공된 컨텍스트와 얼마나 일치하는지 평가하세요:
컨텍스트:
{context_text[:2000]}...
응답:
{response}
평가 기준:
- 사실 정보의 정확성
- 컨텍스트와의 일치성
- 잘못된 추론 여부
점수 (0.0-1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _evaluate_completeness(self, query: str, response: str) -> float:
"""응답의 완성도 평가"""
prompt = f"""
응답이 질문을 얼마나 완전히 다루고 있는지 평가하세요:
질문: {query}
응답: {response}
평가 기준:
- 질문의 모든 측면 다룸
- 충분한 세부 정보 제공
- 논리적 구조와 흐름
점수 (0.0-1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _generate_improvements(
self,
query: str,
response: str,
context: List[Document],
evaluation: dict
) -> List[str]:
"""개선 제안 생성"""
weak_areas = [k for k, v in evaluation.items() if isinstance(v, float) and v < 0.7]
prompt = f"""
다음 응답의 약한 부분을 개선하기 위한 구체적인 제안을 생성하세요:
질문: {query}
응답: {response}
약한 영역: {weak_areas}
개선 제안 (각 항목별로):
"""
result = await self.llm.ainvoke(prompt)
suggestions = result.content.split('\n')
return [s.strip() for s in suggestions if s.strip()]
6. 메모리 관리 시스템
# 대화 메모리 - 컨텍스트 유지 및 학습
class ConversationMemory:
def __init__(self, vector_store):
self.vector_store = vector_store
self.session_memory = {}
self.long_term_memory = []
async def update(self, query: str, response: str, evaluation: dict):
"""메모리 업데이트"""
# 세션 메모리 업데이트
session_id = self._get_session_id()
if session_id not in self.session_memory:
self.session_memory[session_id] = []
interaction = {
'query': query,
'response': response,
'evaluation': evaluation,
'timestamp': datetime.now().isoformat()
}
self.session_memory[session_id].append(interaction)
# 고품질 상호작용을 장기 메모리에 저장
if evaluation.get('overall_score', 0) > 0.8:
await self._store_in_long_term_memory(interaction)
async def get_relevant_context(self, query: str, k: int = 3) -> List[dict]:
"""관련 과거 상호작용 검색"""
# 벡터 검색으로 유사한 과거 쿼리 찾기
similar_interactions = await self.vector_store.asimilarity_search(
query, k=k, filter={'type': 'interaction'}
)
return [json.loads(doc.page_content) for doc in similar_interactions]
async def _store_in_long_term_memory(self, interaction: dict):
"""장기 메모리 저장"""
# 상호작용을 문서로 변환
doc = Document(
page_content=json.dumps(interaction),
metadata={
'type': 'interaction',
'timestamp': interaction['timestamp'],
'quality_score': interaction['evaluation']['overall_score']
}
)
# 벡터 스토어에 저장
await self.vector_store.aadd_documents([doc])
# 로컬 리스트에도 추가
self.long_term_memory.append(interaction)
def _get_session_id(self) -> str:
"""세션 ID 생성 (실제 구현에서는 사용자별로 관리)"""
return "default_session"
AgentOps 통합 및 모니터링
1. AgentOps 통합
import agentops
from typing import Dict, Any
class AgenticRAGWithAgentOps:
def __init__(self):
# AgentOps 초기화
agentops.init(api_key=os.getenv('AGENTOPS_API_KEY'))
# 시스템 컴포넌트 초기화
self.router_agent = RouterAgent()
self.retriever_agent = RetrieverAgent()
self.synthesizer_agent = SynthesizerAgent()
self.evaluator_agent = EvaluatorAgent()
self.memory = ConversationMemory()
@agentops.record_function('agentic_rag_pipeline')
async def process_query_with_monitoring(self, query: str) -> Dict[str, Any]:
"""AgentOps 모니터링이 포함된 Agentic RAG 파이프라인"""
start_time = time.time()
pipeline_metrics = {
'query_length': len(query),
'start_time': start_time
}
try:
# 1. 라우팅 단계 모니터링
route_decision = await self._route_with_monitoring(query)
pipeline_metrics['route_decision'] = route_decision
# 2. 검색 단계 모니터링
context = await self._retrieve_with_monitoring(query, route_decision)
pipeline_metrics['context_docs_count'] = len(context)
# 3. 합성 단계 모니터링
response = await self._synthesize_with_monitoring(query, context)
pipeline_metrics['response_length'] = len(response)
# 4. 평가 단계 모니터링
evaluation = await self._evaluate_with_monitoring(query, response, context)
pipeline_metrics.update(evaluation)
# 5. 메모리 업데이트
await self.memory.update(query, response, evaluation)
# 성공 메트릭 기록
pipeline_metrics.update({
'success': True,
'total_time': time.time() - start_time
})
agentops.record_action({
'action_type': 'agentic_rag_success',
'metrics': pipeline_metrics
})
return {
'response': response,
'evaluation': evaluation,
'metrics': pipeline_metrics
}
except Exception as e:
# 에러 메트릭 기록
pipeline_metrics.update({
'success': False,
'error': str(e),
'total_time': time.time() - start_time
})
agentops.record_action({
'action_type': 'agentic_rag_error',
'metrics': pipeline_metrics
})
raise
@agentops.record_function('routing_decision')
async def _route_with_monitoring(self, query: str) -> str:
"""라우팅 결정 모니터링"""
start_time = time.time()
route_decision = await self.router_agent.route(query)
agentops.record_action({
'action_type': 'routing_decision',
'metrics': {
'query': query,
'decision': route_decision,
'routing_time': time.time() - start_time
}
})
return route_decision
@agentops.record_function('context_retrieval')
async def _retrieve_with_monitoring(self, query: str, route_decision: str) -> List[Document]:
"""검색 과정 모니터링"""
start_time = time.time()
if route_decision == "vector_search":
context = await self.retriever_agent.vector_search(query)
search_type = "vector"
elif route_decision == "web_search":
context = await self.retriever_agent.web_search(query)
search_type = "web"
else:
context = await self.retriever_agent.hybrid_search(query)
search_type = "hybrid"
agentops.record_action({
'action_type': 'context_retrieval',
'metrics': {
'search_type': search_type,
'documents_retrieved': len(context),
'retrieval_time': time.time() - start_time,
'avg_relevance': sum(doc.metadata.get('relevance_score', 0) for doc in context) / len(context) if context else 0
}
})
return context
@agentops.record_function('response_synthesis')
async def _synthesize_with_monitoring(self, query: str, context: List[Document]) -> str:
"""응답 합성 모니터링"""
start_time = time.time()
response = await self.synthesizer_agent.synthesize(query, context)
agentops.record_action({
'action_type': 'response_synthesis',
'metrics': {
'synthesis_time': time.time() - start_time,
'response_length': len(response),
'context_utilization': len(context)
}
})
return response
@agentops.record_function('response_evaluation')
async def _evaluate_with_monitoring(self, query: str, response: str, context: List[Document]) -> dict:
"""응답 평가 모니터링"""
start_time = time.time()
evaluation = await self.evaluator_agent.evaluate(query, response, context)
agentops.record_action({
'action_type': 'response_evaluation',
'metrics': {
'evaluation_time': time.time() - start_time,
**evaluation
}
})
return evaluation
2. 성능 최적화 및 A/B 테스트
class AgenticRAGOptimizer:
def __init__(self):
self.performance_tracker = PerformanceTracker()
self.ab_tester = ABTester()
@agentops.record_function('agentic_rag_ab_test')
async def run_ab_test(self, test_queries: List[str], configurations: Dict[str, Dict]) -> Dict:
"""Agentic RAG 시스템의 A/B 테스트 실행"""
results = {}
for config_name, config in configurations.items():
print(f"테스트 구성 실행: {config_name}")
config_results = []
for query in test_queries:
# 구성별 시스템 생성
rag_system = self._create_system_with_config(config)
start_time = time.time()
try:
result = await rag_system.process_query_with_monitoring(query)
metrics = {
'query': query,
'response': result['response'],
'response_time': time.time() - start_time,
'success': True,
'config': config_name,
**result['evaluation']
}
config_results.append(metrics)
except Exception as e:
config_results.append({
'query': query,
'error': str(e),
'response_time': time.time() - start_time,
'success': False,
'config': config_name
})
results[config_name] = config_results
# A/B 테스트 결과 분석
analysis = self._analyze_ab_results(results)
agentops.record_action({
'action_type': 'ab_test_completed',
'metrics': {
'test_configurations': list(configurations.keys()),
'total_queries': len(test_queries),
'analysis': analysis
}
})
return analysis
def _create_system_with_config(self, config: Dict) -> AgenticRAGWithAgentOps:
"""구성에 따른 시스템 생성"""
system = AgenticRAGWithAgentOps()
# 라우터 설정
if 'router_config' in config:
system.router_agent.configure(config['router_config'])
# 검색 설정
if 'retrieval_config' in config:
system.retriever_agent.configure(config['retrieval_config'])
# 합성 설정
if 'synthesis_config' in config:
system.synthesizer_agent.configure(config['synthesis_config'])
return system
def _analyze_ab_results(self, results: Dict) -> Dict:
"""A/B 테스트 결과 분석"""
analysis = {}
for config_name, config_results in results.items():
successful_results = [r for r in config_results if r.get('success', False)]
if successful_results:
analysis[config_name] = {
'success_rate': len(successful_results) / len(config_results),
'avg_response_time': sum(r['response_time'] for r in successful_results) / len(successful_results),
'avg_overall_score': sum(r.get('overall_score', 0) for r in successful_results) / len(successful_results),
'avg_relevance': sum(r.get('relevance', 0) for r in successful_results) / len(successful_results),
'avg_accuracy': sum(r.get('accuracy', 0) for r in successful_results) / len(successful_results)
}
else:
analysis[config_name] = {
'success_rate': 0,
'error': 'No successful results'
}
# 최고 성능 구성 식별
best_config = max(
analysis.keys(),
key=lambda k: analysis[k].get('avg_overall_score', 0)
)
analysis['best_configuration'] = best_config
return analysis
실전 적용 사례
1. 고객 지원 시스템
class CustomerSupportAgenticRAG:
def __init__(self):
self.agentic_rag = AgenticRAGWithAgentOps()
self.knowledge_base = CustomerKnowledgeBase()
self.ticket_system = TicketSystem()
@agentops.record_function('customer_support_query')
async def handle_customer_inquiry(self, inquiry: str, customer_context: Dict) -> Dict:
"""고객 문의 처리"""
# 고객 컨텍스트를 포함한 쿼리 확장
enriched_query = self._enrich_query_with_context(inquiry, customer_context)
# Agentic RAG로 답변 생성
result = await self.agentic_rag.process_query_with_monitoring(enriched_query)
# 고객 맞춤화
personalized_response = await self._personalize_response(
result['response'], customer_context
)
# 만족도 예측
satisfaction_prediction = await self._predict_satisfaction(
inquiry, personalized_response
)
agentops.record_action({
'action_type': 'customer_support_completed',
'metrics': {
'customer_id': customer_context.get('customer_id'),
'inquiry_category': self._classify_inquiry(inquiry),
'predicted_satisfaction': satisfaction_prediction,
'response_quality': result['evaluation']['overall_score']
}
})
return {
'response': personalized_response,
'satisfaction_prediction': satisfaction_prediction,
'evaluation': result['evaluation']
}
2. 연구 보조 시스템
class ResearchAssistantAgenticRAG:
def __init__(self):
self.agentic_rag = AgenticRAGWithAgentOps()
self.paper_database = AcademicPaperDatabase()
self.citation_manager = CitationManager()
@agentops.record_function('research_query')
async def research_query(self, question: str, domain: str) -> Dict:
"""연구 질문 처리"""
# 도메인별 특화 검색
domain_context = await self.paper_database.search_by_domain(question, domain)
# Agentic RAG로 종합 분석
result = await self.agentic_rag.process_query_with_monitoring(question)
# 관련 논문 추천
related_papers = await self._recommend_papers(question, domain)
# 연구 방향 제안
research_directions = await self._suggest_research_directions(
question, result['response']
)
# 인용 정보 생성
citations = await self.citation_manager.generate_citations(
result['response'], domain_context
)
return {
'analysis': result['response'],
'related_papers': related_papers,
'research_directions': research_directions,
'citations': citations,
'confidence_score': result['evaluation']['overall_score']
}
성능 벤치마크 및 최적화
1. 성능 메트릭
class AgenticRAGBenchmark:
def __init__(self):
self.metrics = {
'response_time': [],
'accuracy_scores': [],
'relevance_scores': [],
'user_satisfaction': [],
'cost_per_query': []
}
async def run_benchmark(self, test_dataset: List[Dict]) -> Dict:
"""벤치마크 실행"""
system = AgenticRAGWithAgentOps()
results = []
for test_case in test_dataset:
query = test_case['query']
expected_answer = test_case.get('expected_answer')
start_time = time.time()
try:
result = await system.process_query_with_monitoring(query)
response_time = time.time() - start_time
# 성능 메트릭 계산
metrics = {
'query': query,
'response': result['response'],
'response_time': response_time,
'accuracy': self._calculate_accuracy(result['response'], expected_answer),
'relevance': result['evaluation']['relevance'],
'overall_score': result['evaluation']['overall_score'],
'cost': self._estimate_cost(result['metrics'])
}
results.append(metrics)
except Exception as e:
results.append({
'query': query,
'error': str(e),
'response_time': time.time() - start_time,
'success': False
})
# 종합 성능 분석
performance_summary = self._analyze_performance(results)
return {
'individual_results': results,
'performance_summary': performance_summary,
'recommendations': self._generate_optimization_recommendations(performance_summary)
}
def _analyze_performance(self, results: List[Dict]) -> Dict:
"""성능 분석"""
successful_results = [r for r in results if r.get('success', True)]
if not successful_results:
return {'error': 'No successful results'}
return {
'total_queries': len(results),
'success_rate': len(successful_results) / len(results),
'avg_response_time': sum(r['response_time'] for r in successful_results) / len(successful_results),
'avg_accuracy': sum(r.get('accuracy', 0) for r in successful_results) / len(successful_results),
'avg_relevance': sum(r.get('relevance', 0) for r in successful_results) / len(successful_results),
'avg_overall_score': sum(r.get('overall_score', 0) for r in successful_results) / len(successful_results),
'total_cost': sum(r.get('cost', 0) for r in successful_results)
}
결론
AI Engineering Hub의 Agentic RAG 프로젝트는 다중 에이전트 협업을 통한 차세대 RAG 시스템의 실전 구현을 보여줍니다.
핵심 장점
- 모듈화된 아키텍처: 각 에이전트의 전문화된 역할
- 동적 적응성: 쿼리 유형에 따른 전략 조정
- 품질 보장: 다단계 평가 및 개선 메커니즘
- 확장성: 새로운 에이전트 및 도구 추가 용이
AgentOps 통합 효과
- 실시간 모니터링: 각 에이전트별 성능 추적
- A/B 테스팅: 다양한 구성의 효과 비교
- 성능 최적화: 데이터 기반 시스템 개선
- 운영 안정성: 오류 추적 및 복구
실무 적용 가이드
- 단계적 도입: 기존 RAG에서 점진적 업그레이드
- 도메인 특화: 업무 영역에 맞는 에이전트 커스터마이징
- 지속적 개선: AgentOps 데이터 기반 최적화
- 사용자 피드백: 실제 사용 패턴 반영
Agentic RAG는 단순한 검색-생성을 넘어 지능적 추론과 협업을 통한 차세대 AI 시스템의 가능성을 보여줍니다. AI Engineering Hub의 소스코드를 기반으로 여러분만의 Agentic RAG 시스템을 구축해보세요.