تحليل شامل لكود مصدر Agentic RAG في AI Engineering Hub: نظام التوليد المعزز بالاسترجاع القائم على الوكلاء
نظرة عامة
AI Engineering Hub هو أفضل مستودع تعليمي لهندسة الذكاء الاصطناعي بأكثر من 10.7k نجمة. من بين مشاريعه، يُعدّ Agentic RAG المشروع المحوري الذي يُظهر التطبيق العملي لأنظمة التوليد المعزز بالاسترجاع القائمة على الوكلاء.
يتناول هذا المقال تحليلاً تفصيلياً لمشروع Agentic RAG في AI Engineering Hub على مستوى الكود المصدري، مع عرض منهجية الاستخدام العملي من منظور AgentOps.
ما هو Agentic RAG؟
قيود RAG التقليدي:
- جمود خط الأنابيب الأحادي للاسترجاع والتوليد
- قصور في الاستدلال متعدد الخطوات المعقد
- غياب القدرة على التكيف الديناميكي
- صعوبة الحفاظ على السياق
حلول Agentic RAG:
- تعاون متعدد الوكلاء: تقسيم الأدوار بين وكلاء متخصصين
- سير عمل ديناميكي: عمليات تكيفية تستجيب للاستعلام
- استخدام الأدوات: الاستفادة من واجهات API الخارجية وقواعد البيانات
- إدارة الذاكرة: الحفاظ على السياق والتعلم المستمر
هيكل مشروع AI Engineering Hub Agentic RAG
1. تحليل معمارية المشروع
# المعمارية الأساسية لـ AI Engineering Hub Agentic RAG
class AgenticRAGSystem:
def __init__(self):
# تهيئة الوكلاء
self.router_agent = RouterAgent()
self.retriever_agent = RetrieverAgent()
self.synthesizer_agent = SynthesizerAgent()
self.evaluator_agent = EvaluatorAgent()
# تهيئة الأدوات
self.vector_store = VectorStore()
self.web_search = WebSearchTool()
self.memory = ConversationMemory()
async def process_query(self, query: str) -> str:
"""العملية الرئيسية لـ Agentic RAG"""
# 1. مرحلة التوجيه
route_decision = await self.router_agent.route(query)
# 2. مرحلة الاسترجاع
if route_decision == "vector_search":
context = await self.retriever_agent.vector_search(query)
elif route_decision == "web_search":
context = await self.retriever_agent.web_search(query)
else:
context = await self.retriever_agent.hybrid_search(query)
# 3. مرحلة التوليف
response = await self.synthesizer_agent.synthesize(query, context)
# 4. مرحلة التقييم
evaluation = await self.evaluator_agent.evaluate(query, response, context)
# 5. تحديث الذاكرة
await self.memory.update(query, response, evaluation)
return response
2. تطبيق وكيل التوجيه
# وكيل التوجيه - تصنيف الاستعلامات وتوجيهها
class RouterAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.classifier = QueryClassifier()
async def route(self, query: str) -> str:
"""توجيه الاستعلام إلى استراتيجية البحث المناسبة"""
# تحليل الاستعلام
query_analysis = await self._analyze_query(query)
# قرار التوجيه
routing_prompt = f"""
حلّل الاستعلام التالي وحدد استراتيجية البحث المثلى:
الاستعلام: {query}
التحليل: {query_analysis}
الخيارات:
1. vector_search: عند الحاجة إلى البحث في الوثائق
2. web_search: عند الحاجة إلى معلومات حديثة
3. hybrid_search: عند الحاجة إلى معلومات مركبة
القرار: (أجب بأحد الخيارات فقط: vector_search / web_search / hybrid_search)
"""
response = await self.llm.ainvoke(routing_prompt)
return response.content.strip().lower()
async def _analyze_query(self, query: str) -> dict:
"""تحليل خصائص الاستعلام"""
analysis_prompt = f"""
حلّل خصائص الاستعلام التالي:
الاستعلام: {query}
عناصر التحليل:
1. الزمنية (هل تحتاج إلى معلومات حديثة؟)
2. التعقيد (هل يتطلب استدلالاً متعدد الخطوات؟)
3. المجال (هل ينتمي إلى تخصص محدد؟)
4. القصد (بحث عن معلومات، تحليل، تلخيص، إلخ)
أجب بصيغة JSON.
"""
response = await self.llm.ainvoke(analysis_prompt)
try:
return json.loads(response.content)
except:
return {"error": "فشل التحليل"}
3. تطبيق وكيل الاسترجاع
# وكيل الاسترجاع - تطبيق استراتيجيات بحث متنوعة
class RetrieverAgent:
def __init__(self, vector_store, web_search_tool):
self.vector_store = vector_store
self.web_search = web_search_tool
self.llm = ChatOpenAI(temperature=0)
async def vector_search(self, query: str, k: int = 5) -> List[Document]:
"""تنفيذ البحث المتجهي"""
# توسيع الاستعلام
expanded_query = await self._expand_query(query)
# البحث المتجهي
docs = await self.vector_store.asimilarity_search(
expanded_query, k=k
)
# معالجة نتائج البحث
processed_docs = await self._post_process_docs(docs, query)
return processed_docs
async def web_search(self, query: str, max_results: int = 3) -> List[Document]:
"""تنفيذ البحث على الويب"""
# تحسين استعلام البحث
optimized_query = await self._optimize_web_query(query)
# تشغيل البحث على الويب
search_results = await self.web_search.arun(optimized_query)
# توثيق النتائج
documents = []
for result in search_results[:max_results]:
doc = Document(
page_content=result.get('content', ''),
metadata={
'source': result.get('url', ''),
'title': result.get('title', ''),
'timestamp': datetime.now().isoformat()
}
)
documents.append(doc)
return documents
async def hybrid_search(self, query: str) -> List[Document]:
"""البحث الهجين - دمج البحث المتجهي مع البحث على الويب"""
# تشغيل البحثين بالتوازي
vector_task = asyncio.create_task(self.vector_search(query, k=3))
web_task = asyncio.create_task(self.web_search(query, max_results=2))
vector_docs, web_docs = await asyncio.gather(vector_task, web_task)
# دمج النتائج وإزالة التكرار
combined_docs = await self._merge_results(vector_docs, web_docs, query)
return combined_docs
async def _expand_query(self, query: str) -> str:
"""تحسين أداء البحث عبر توسيع الاستعلام"""
expansion_prompt = f"""
وسّع الاستعلام التالي دلالياً للحصول على نتائج بحث أفضل:
الاستعلام الأصلي: {query}
الاستعلام الموسع (يتضمن الكلمات المفتاحية الأساسية):
"""
response = await self.llm.ainvoke(expansion_prompt)
return response.content.strip()
async def _post_process_docs(self, docs: List[Document], query: str) -> List[Document]:
"""معالجة نتائج البحث وتصفيتها حسب الصلة"""
processed_docs = []
for doc in docs:
# حساب درجة الصلة
relevance_score = await self._calculate_relevance(doc.page_content, query)
if relevance_score > 0.7: # تحديد العتبة
doc.metadata['relevance_score'] = relevance_score
processed_docs.append(doc)
# ترتيب حسب الصلة
processed_docs.sort(
key=lambda x: x.metadata.get('relevance_score', 0),
reverse=True
)
return processed_docs
async def _calculate_relevance(self, content: str, query: str) -> float:
"""حساب درجة الصلة بين الوثيقة والاستعلام"""
relevance_prompt = f"""
قيّم مدى صلة الوثيقة التالية بالاستعلام بدرجة بين 0.0 و 1.0:
الاستعلام: {query}
الوثيقة: {content[:500]}...
أجب بالدرجة فقط (مثال: 0.85):
"""
response = await self.llm.ainvoke(relevance_prompt)
try:
return float(response.content.strip())
except:
return 0.5 # القيمة الافتراضية
4. تطبيق وكيل التوليف
# وكيل التوليف - توليد الإجابات استناداً إلى نتائج البحث
class SynthesizerAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.response_templates = ResponseTemplates()
async def synthesize(self, query: str, context: List[Document]) -> str:
"""تركيب إجابة بناءً على السياق المسترجع"""
# تجهيز السياق
formatted_context = await self._format_context(context)
# تحديد استراتيجية توليد الإجابة
synthesis_strategy = await self._determine_strategy(query, context)
# توليد الإجابة وفق الاستراتيجية
if synthesis_strategy == "direct_answer":
response = await self._generate_direct_answer(query, formatted_context)
elif synthesis_strategy == "comparative_analysis":
response = await self._generate_comparative_analysis(query, formatted_context)
elif synthesis_strategy == "step_by_step":
response = await self._generate_step_by_step(query, formatted_context)
else:
response = await self._generate_comprehensive_response(query, formatted_context)
return response
async def _format_context(self, documents: List[Document]) -> str:
"""تنسيق الوثائق بصيغة قابلة للقراءة"""
formatted_sections = []
for i, doc in enumerate(documents, 1):
source = doc.metadata.get('source', 'مصدر غير معروف')
relevance = doc.metadata.get('relevance_score', 'N/A')
section = f"""
[مرجع {i}] (المصدر: {source}، الصلة: {relevance})
{doc.page_content}
"""
formatted_sections.append(section)
return "\n".join(formatted_sections)
async def _determine_strategy(self, query: str, context: List[Document]) -> str:
"""تحديد استراتيجية توليد الإجابة"""
strategy_prompt = f"""
حلّل الاستعلام والسياق التاليين لتحديد الاستراتيجية المثلى للإجابة:
الاستعلام: {query}
عدد عناصر السياق: {len(context)}
خيارات الاستراتيجية:
1. direct_answer: إجابة مباشرة
2. comparative_analysis: تحليل مقارن
3. step_by_step: شرح خطوة بخطوة
4. comprehensive_response: إجابة شاملة
الاستراتيجية المختارة:
"""
response = await self.llm.ainvoke(strategy_prompt)
return response.content.strip().lower()
async def _generate_direct_answer(self, query: str, context: str) -> str:
"""توليد إجابة مباشرة"""
prompt = f"""
استناداً إلى السياق التالي، قدّم إجابة واضحة ومباشرة على السؤال:
السؤال: {query}
السياق:
{context}
إرشادات الإجابة:
1. تضمين المحتوى الأساسي فقط
2. الوضوح والإيجاز
3. تضمين معلومات المصدر
4. الإشارة صراحةً إلى أي معلومات غير مؤكدة
الإجابة:
"""
response = await self.llm.ainvoke(prompt)
return response.content
async def _generate_comparative_analysis(self, query: str, context: str) -> str:
"""توليد تحليل مقارن"""
prompt = f"""
قارن المعلومات الواردة في السياق التالي وأجب على السؤال:
السؤال: {query}
السياق:
{context}
هيكل التحليل:
1. تحديد وجهات النظر الرئيسية
2. تحليل أوجه التشابه والاختلاف
3. تقديم الاستنتاج الشامل
4. تقييم موثوقية كل مصدر
التحليل المقارن:
"""
response = await self.llm.ainvoke(prompt)
return response.content
async def _generate_step_by_step(self, query: str, context: str) -> str:
"""توليد شرح خطوة بخطوة"""
prompt = f"""
استناداً إلى السياق التالي، قدّم شرحاً خطوة بخطوة للسؤال:
السؤال: {query}
السياق:
{context}
هيكل الشرح:
1. نظرة عامة
2. شرح تفصيلي لكل خطوة
3. أهمية كل خطوة
4. طريقة التطبيق العملي
5. تنبيهات وملاحظات
الشرح خطوة بخطوة:
"""
response = await self.llm.ainvoke(prompt)
return response.content
5. تطبيق وكيل التقييم
# وكيل التقييم - تقييم جودة الإجابات وتحسينها
class EvaluatorAgent:
def __init__(self, llm_config):
self.llm = ChatOpenAI(**llm_config)
self.metrics = ResponseMetrics()
async def evaluate(self, query: str, response: str, context: List[Document]) -> dict:
"""تقييم شامل لجودة الإجابة"""
# تشغيل مقاييس تقييم متعددة
evaluation_tasks = [
self._evaluate_relevance(query, response),
self._evaluate_accuracy(response, context),
self._evaluate_completeness(query, response),
self._evaluate_clarity(response),
self._evaluate_factuality(response, context)
]
results = await asyncio.gather(*evaluation_tasks)
evaluation = {
'relevance': results[0],
'accuracy': results[1],
'completeness': results[2],
'clarity': results[3],
'factuality': results[4],
'overall_score': sum(results) / len(results),
'timestamp': datetime.now().isoformat()
}
# توليد مقترحات التحسين
if evaluation['overall_score'] < 0.8:
evaluation['improvement_suggestions'] = await self._generate_improvements(
query, response, context, evaluation
)
return evaluation
async def _evaluate_relevance(self, query: str, response: str) -> float:
"""تقييم صلة الإجابة بالسؤال"""
prompt = f"""
قيّم مدى صلة الإجابة بالسؤال بدرجة بين 0.0 و 1.0:
السؤال: {query}
الإجابة: {response}
معايير التقييم:
- مدى استيعاب القصد الأساسي للسؤال
- الارتباط المباشر بالإجابة
- وجود معلومات غير ضرورية
الدرجة (0.0 - 1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _evaluate_accuracy(self, response: str, context: List[Document]) -> float:
"""تقييم دقة الإجابة"""
context_text = "\n".join([doc.page_content for doc in context])
prompt = f"""
قيّم مدى توافق الإجابة مع السياق المقدم:
السياق:
{context_text[:2000]}...
الإجابة:
{response}
معايير التقييم:
- دقة المعلومات الواقعية
- التوافق مع السياق
- وجود استدلال خاطئ
الدرجة (0.0 - 1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _evaluate_completeness(self, query: str, response: str) -> float:
"""تقييم اكتمال الإجابة"""
prompt = f"""
قيّم مدى شمولية الإجابة للسؤال:
السؤال: {query}
الإجابة: {response}
معايير التقييم:
- تناول جميع جوانب السؤال
- توفير تفاصيل كافية
- البنية المنطقية وتسلسل الأفكار
الدرجة (0.0 - 1.0):
"""
result = await self.llm.ainvoke(prompt)
try:
return float(result.content.strip())
except:
return 0.5
async def _generate_improvements(
self,
query: str,
response: str,
context: List[Document],
evaluation: dict
) -> List[str]:
"""توليد مقترحات التحسين"""
weak_areas = [k for k, v in evaluation.items() if isinstance(v, float) and v < 0.7]
prompt = f"""
اقترح توصيات محددة لتحسين الجوانب الضعيفة في الإجابة التالية:
السؤال: {query}
الإجابة: {response}
الجوانب الضعيفة: {weak_areas}
مقترحات التحسين (لكل بند على حدة):
"""
result = await self.llm.ainvoke(prompt)
suggestions = result.content.split('\n')
return [s.strip() for s in suggestions if s.strip()]
6. نظام إدارة الذاكرة
# ذاكرة المحادثة - الحفاظ على السياق والتعلم
class ConversationMemory:
def __init__(self, vector_store):
self.vector_store = vector_store
self.session_memory = {}
self.long_term_memory = []
async def update(self, query: str, response: str, evaluation: dict):
"""تحديث الذاكرة"""
# تحديث ذاكرة الجلسة
session_id = self._get_session_id()
if session_id not in self.session_memory:
self.session_memory[session_id] = []
interaction = {
'query': query,
'response': response,
'evaluation': evaluation,
'timestamp': datetime.now().isoformat()
}
self.session_memory[session_id].append(interaction)
# تخزين التفاعلات عالية الجودة في الذاكرة طويلة الأمد
if evaluation.get('overall_score', 0) > 0.8:
await self._store_in_long_term_memory(interaction)
async def get_relevant_context(self, query: str, k: int = 3) -> List[dict]:
"""استرجاع التفاعلات السابقة ذات الصلة"""
# البحث عن استعلامات سابقة مشابهة بالبحث المتجهي
similar_interactions = await self.vector_store.asimilarity_search(
query, k=k, filter={'type': 'interaction'}
)
return [json.loads(doc.page_content) for doc in similar_interactions]
async def _store_in_long_term_memory(self, interaction: dict):
"""التخزين في الذاكرة طويلة الأمد"""
# تحويل التفاعل إلى وثيقة
doc = Document(
page_content=json.dumps(interaction),
metadata={
'type': 'interaction',
'timestamp': interaction['timestamp'],
'quality_score': interaction['evaluation']['overall_score']
}
)
# التخزين في مخزن المتجهات
await self.vector_store.aadd_documents([doc])
# إضافة إلى القائمة المحلية أيضاً
self.long_term_memory.append(interaction)
def _get_session_id(self) -> str:
"""توليد معرف الجلسة (في التطبيق الفعلي يُدار لكل مستخدم)"""
return "default_session"
تكامل AgentOps والمراقبة
1. تكامل AgentOps
import agentops
from typing import Dict, Any
class AgenticRAGWithAgentOps:
def __init__(self):
# تهيئة AgentOps
agentops.init(api_key=os.getenv('AGENTOPS_API_KEY'))
# تهيئة مكونات النظام
self.router_agent = RouterAgent()
self.retriever_agent = RetrieverAgent()
self.synthesizer_agent = SynthesizerAgent()
self.evaluator_agent = EvaluatorAgent()
self.memory = ConversationMemory()
@agentops.record_function('agentic_rag_pipeline')
async def process_query_with_monitoring(self, query: str) -> Dict[str, Any]:
"""خط أنابيب Agentic RAG مع مراقبة AgentOps"""
start_time = time.time()
pipeline_metrics = {
'query_length': len(query),
'start_time': start_time
}
try:
# 1. مراقبة مرحلة التوجيه
route_decision = await self._route_with_monitoring(query)
pipeline_metrics['route_decision'] = route_decision
# 2. مراقبة مرحلة الاسترجاع
context = await self._retrieve_with_monitoring(query, route_decision)
pipeline_metrics['context_docs_count'] = len(context)
# 3. مراقبة مرحلة التوليف
response = await self._synthesize_with_monitoring(query, context)
pipeline_metrics['response_length'] = len(response)
# 4. مراقبة مرحلة التقييم
evaluation = await self._evaluate_with_monitoring(query, response, context)
pipeline_metrics.update(evaluation)
# 5. تحديث الذاكرة
await self.memory.update(query, response, evaluation)
# تسجيل مقاييس النجاح
pipeline_metrics.update({
'success': True,
'total_time': time.time() - start_time
})
agentops.record_action({
'action_type': 'agentic_rag_success',
'metrics': pipeline_metrics
})
return {
'response': response,
'evaluation': evaluation,
'metrics': pipeline_metrics
}
except Exception as e:
# تسجيل مقاييس الخطأ
pipeline_metrics.update({
'success': False,
'error': str(e),
'total_time': time.time() - start_time
})
agentops.record_action({
'action_type': 'agentic_rag_error',
'metrics': pipeline_metrics
})
raise
@agentops.record_function('routing_decision')
async def _route_with_monitoring(self, query: str) -> str:
"""مراقبة قرار التوجيه"""
start_time = time.time()
route_decision = await self.router_agent.route(query)
agentops.record_action({
'action_type': 'routing_decision',
'metrics': {
'query': query,
'decision': route_decision,
'routing_time': time.time() - start_time
}
})
return route_decision
@agentops.record_function('context_retrieval')
async def _retrieve_with_monitoring(self, query: str, route_decision: str) -> List[Document]:
"""مراقبة عملية الاسترجاع"""
start_time = time.time()
if route_decision == "vector_search":
context = await self.retriever_agent.vector_search(query)
search_type = "vector"
elif route_decision == "web_search":
context = await self.retriever_agent.web_search(query)
search_type = "web"
else:
context = await self.retriever_agent.hybrid_search(query)
search_type = "hybrid"
agentops.record_action({
'action_type': 'context_retrieval',
'metrics': {
'search_type': search_type,
'documents_retrieved': len(context),
'retrieval_time': time.time() - start_time,
'avg_relevance': sum(doc.metadata.get('relevance_score', 0) for doc in context) / len(context) if context else 0
}
})
return context
@agentops.record_function('response_synthesis')
async def _synthesize_with_monitoring(self, query: str, context: List[Document]) -> str:
"""مراقبة توليف الإجابة"""
start_time = time.time()
response = await self.synthesizer_agent.synthesize(query, context)
agentops.record_action({
'action_type': 'response_synthesis',
'metrics': {
'synthesis_time': time.time() - start_time,
'response_length': len(response),
'context_utilization': len(context)
}
})
return response
@agentops.record_function('response_evaluation')
async def _evaluate_with_monitoring(self, query: str, response: str, context: List[Document]) -> dict:
"""مراقبة تقييم الإجابة"""
start_time = time.time()
evaluation = await self.evaluator_agent.evaluate(query, response, context)
agentops.record_action({
'action_type': 'response_evaluation',
'metrics': {
'evaluation_time': time.time() - start_time,
**evaluation
}
})
return evaluation
2. تحسين الأداء واختبار A/B
class AgenticRAGOptimizer:
def __init__(self):
self.performance_tracker = PerformanceTracker()
self.ab_tester = ABTester()
@agentops.record_function('agentic_rag_ab_test')
async def run_ab_test(self, test_queries: List[str], configurations: Dict[str, Dict]) -> Dict:
"""تشغيل اختبار A/B لنظام Agentic RAG"""
results = {}
for config_name, config in configurations.items():
print(f"تشغيل التكوين: {config_name}")
config_results = []
for query in test_queries:
# إنشاء النظام بتكوين محدد
rag_system = self._create_system_with_config(config)
start_time = time.time()
try:
result = await rag_system.process_query_with_monitoring(query)
metrics = {
'query': query,
'response': result['response'],
'response_time': time.time() - start_time,
'success': True,
'config': config_name,
**result['evaluation']
}
config_results.append(metrics)
except Exception as e:
config_results.append({
'query': query,
'error': str(e),
'response_time': time.time() - start_time,
'success': False,
'config': config_name
})
results[config_name] = config_results
# تحليل نتائج اختبار A/B
analysis = self._analyze_ab_results(results)
agentops.record_action({
'action_type': 'ab_test_completed',
'metrics': {
'test_configurations': list(configurations.keys()),
'total_queries': len(test_queries),
'analysis': analysis
}
})
return analysis
def _create_system_with_config(self, config: Dict) -> AgenticRAGWithAgentOps:
"""إنشاء النظام وفق التكوين المحدد"""
system = AgenticRAGWithAgentOps()
# إعدادات وكيل التوجيه
if 'router_config' in config:
system.router_agent.configure(config['router_config'])
# إعدادات وكيل الاسترجاع
if 'retrieval_config' in config:
system.retriever_agent.configure(config['retrieval_config'])
# إعدادات وكيل التوليف
if 'synthesis_config' in config:
system.synthesizer_agent.configure(config['synthesis_config'])
return system
def _analyze_ab_results(self, results: Dict) -> Dict:
"""تحليل نتائج اختبار A/B"""
analysis = {}
for config_name, config_results in results.items():
successful_results = [r for r in config_results if r.get('success', False)]
if successful_results:
analysis[config_name] = {
'success_rate': len(successful_results) / len(config_results),
'avg_response_time': sum(r['response_time'] for r in successful_results) / len(successful_results),
'avg_overall_score': sum(r.get('overall_score', 0) for r in successful_results) / len(successful_results),
'avg_relevance': sum(r.get('relevance', 0) for r in successful_results) / len(successful_results),
'avg_accuracy': sum(r.get('accuracy', 0) for r in successful_results) / len(successful_results)
}
else:
analysis[config_name] = {
'success_rate': 0,
'error': 'لا توجد نتائج ناجحة'
}
# تحديد التكوين الأفضل أداءً
best_config = max(
analysis.keys(),
key=lambda k: analysis[k].get('avg_overall_score', 0)
)
analysis['best_configuration'] = best_config
return analysis
حالات الاستخدام العملية
1. نظام دعم العملاء
class CustomerSupportAgenticRAG:
def __init__(self):
self.agentic_rag = AgenticRAGWithAgentOps()
self.knowledge_base = CustomerKnowledgeBase()
self.ticket_system = TicketSystem()
@agentops.record_function('customer_support_query')
async def handle_customer_inquiry(self, inquiry: str, customer_context: Dict) -> Dict:
"""معالجة استفسار العميل"""
# توسيع الاستعلام بسياق العميل
enriched_query = self._enrich_query_with_context(inquiry, customer_context)
# توليد الإجابة باستخدام Agentic RAG
result = await self.agentic_rag.process_query_with_monitoring(enriched_query)
# تخصيص الإجابة للعميل
personalized_response = await self._personalize_response(
result['response'], customer_context
)
# التنبؤ بمستوى رضا العميل
satisfaction_prediction = await self._predict_satisfaction(
inquiry, personalized_response
)
agentops.record_action({
'action_type': 'customer_support_completed',
'metrics': {
'customer_id': customer_context.get('customer_id'),
'inquiry_category': self._classify_inquiry(inquiry),
'predicted_satisfaction': satisfaction_prediction,
'response_quality': result['evaluation']['overall_score']
}
})
return {
'response': personalized_response,
'satisfaction_prediction': satisfaction_prediction,
'evaluation': result['evaluation']
}
2. نظام المساعدة البحثية
class ResearchAssistantAgenticRAG:
def __init__(self):
self.agentic_rag = AgenticRAGWithAgentOps()
self.paper_database = AcademicPaperDatabase()
self.citation_manager = CitationManager()
@agentops.record_function('research_query')
async def research_query(self, question: str, domain: str) -> Dict:
"""معالجة الاستفسار البحثي"""
# بحث متخصص حسب المجال
domain_context = await self.paper_database.search_by_domain(question, domain)
# التحليل الشامل باستخدام Agentic RAG
result = await self.agentic_rag.process_query_with_monitoring(question)
# اقتراح أوراق بحثية ذات صلة
related_papers = await self._recommend_papers(question, domain)
# اقتراح مسارات بحثية
research_directions = await self._suggest_research_directions(
question, result['response']
)
# توليد معلومات الاستشهاد
citations = await self.citation_manager.generate_citations(
result['response'], domain_context
)
return {
'analysis': result['response'],
'related_papers': related_papers,
'research_directions': research_directions,
'citations': citations,
'confidence_score': result['evaluation']['overall_score']
}
معايير الأداء والتحسين
1. مقاييس الأداء
class AgenticRAGBenchmark:
def __init__(self):
self.metrics = {
'response_time': [],
'accuracy_scores': [],
'relevance_scores': [],
'user_satisfaction': [],
'cost_per_query': []
}
async def run_benchmark(self, test_dataset: List[Dict]) -> Dict:
"""تشغيل اختبار الأداء"""
system = AgenticRAGWithAgentOps()
results = []
for test_case in test_dataset:
query = test_case['query']
expected_answer = test_case.get('expected_answer')
start_time = time.time()
try:
result = await system.process_query_with_monitoring(query)
response_time = time.time() - start_time
# حساب مقاييس الأداء
metrics = {
'query': query,
'response': result['response'],
'response_time': response_time,
'accuracy': self._calculate_accuracy(result['response'], expected_answer),
'relevance': result['evaluation']['relevance'],
'overall_score': result['evaluation']['overall_score'],
'cost': self._estimate_cost(result['metrics'])
}
results.append(metrics)
except Exception as e:
results.append({
'query': query,
'error': str(e),
'response_time': time.time() - start_time,
'success': False
})
# تحليل شامل للأداء
performance_summary = self._analyze_performance(results)
return {
'individual_results': results,
'performance_summary': performance_summary,
'recommendations': self._generate_optimization_recommendations(performance_summary)
}
def _analyze_performance(self, results: List[Dict]) -> Dict:
"""تحليل الأداء"""
successful_results = [r for r in results if r.get('success', True)]
if not successful_results:
return {'error': 'لا توجد نتائج ناجحة'}
return {
'total_queries': len(results),
'success_rate': len(successful_results) / len(results),
'avg_response_time': sum(r['response_time'] for r in successful_results) / len(successful_results),
'avg_accuracy': sum(r.get('accuracy', 0) for r in successful_results) / len(successful_results),
'avg_relevance': sum(r.get('relevance', 0) for r in successful_results) / len(successful_results),
'avg_overall_score': sum(r.get('overall_score', 0) for r in successful_results) / len(successful_results),
'total_cost': sum(r.get('cost', 0) for r in successful_results)
}
الخلاصة
يُظهر مشروع Agentic RAG في AI Engineering Hub التطبيق العملي لنظام RAG من الجيل التالي القائم على تعاون متعدد الوكلاء.
المزايا الأساسية
- معمارية معيارية: أدوار متخصصة لكل وكيل
- تكيف ديناميكي: تعديل الاستراتيجية وفق نوع الاستعلام
- ضمان الجودة: آليات تقييم وتحسين متعددة المراحل
- قابلية التوسع: سهولة إضافة وكلاء وأدوات جديدة
أثر تكامل AgentOps
- مراقبة في الوقت الفعلي: تتبع أداء كل وكيل
- اختبار A/B: مقارنة فعالية تكوينات مختلفة
- تحسين الأداء: تطوير النظام استناداً إلى البيانات
- استقرار التشغيل: تتبع الأخطاء والتعافي منها
دليل التطبيق العملي
- اعتماد تدريجي: ترقية تدريجية من RAG التقليدي
- تخصيص حسب المجال: تهيئة الوكلاء لتناسب مجال العمل
- تحسين مستمر: تطوير قائم على بيانات AgentOps
- تغذية راجعة من المستخدمين: استيعاب أنماط الاستخدام الفعلية
يتخطى Agentic RAG حدود الاسترجاع والتوليد البسيط، ليكشف عن إمكانات أنظمة الذكاء الاصطناعي من الجيل التالي القائمة على الاستدلال الذكي والتعاون. استناداً إلى الكود المصدري في AI Engineering Hub، ابنِ نظام Agentic RAG الخاص بك.