LangChain Open Deep Research: دليل شامل لنظام أتمتة البحث متعدد الوكلاء بالذكاء الاصطناعي
⏱️ وقت القراءة المقدر: 20 دقائق
مقدمة
LangChain Open Deep Research هو نظام متعدد الوكلاء مفتوح المصدر مصمم لأتمتة مهام البحث المعقدة من البداية إلى النهاية. يجمع البحث على الويب واسترجاع المستندات والتوليف وإنشاء التقارير في خط أنابيب منسّق حيث تتعامل وكلاء متخصصة مع مراحل متمايزة من عملية البحث.
يتناول هذا الدليل المعمارية الكاملة من المفاهيم الجوهرية إلى النشر الإنتاجي، بما في ذلك تصميم الوكيل المركز على الجودة، وأنماط تنسيق الوكلاء المتعددة، وRAG المتقدم، والتكيفات الخاصة بالمجالات للبحث المالي والطبي.
نظرة عامة على معمارية النظام
يستخدم Open Deep Research معمارية وكيل محور وأذرع مبنية على LangGraph:
graph TD
A[طلب البحث] --> B[المنسق]
B --> C[مخطط الاستعلام]
C --> D[وكيل البحث]
C --> E[وكيل المستندات]
D --> F[مصادر الويب]
E --> G[مخزن المتجهات]
D --> H[وكيل التوليف]
E --> H
H --> I[بوابة الجودة]
I -->|اجتياز| J[منشئ التقارير]
I -->|فشل| B
J --> K[التقرير النهائي]
الوكيل المركز على الجودة
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional
import asyncio
class ResearchState(TypedDict):
query: str
sub_queries: List[str]
search_results: List[dict]
synthesized_content: str
quality_score: float
final_report: Optional[str]
iteration: int
class QualityFocusedAgent:
def __init__(
self,
model: str = "claude-3-7-sonnet-latest",
quality_threshold: float = 0.8,
max_iterations: int = 3
):
self.llm = ChatAnthropic(model=model)
self.quality_threshold = quality_threshold
self.max_iterations = max_iterations
self.graph = self._build_graph()
def _build_graph(self) -> StateGraph:
workflow = StateGraph(ResearchState)
workflow.add_node("plan_queries", self._plan_queries)
workflow.add_node("search", self._parallel_search)
workflow.add_node("synthesize", self._synthesize)
workflow.add_node("evaluate_quality", self._evaluate_quality)
workflow.add_node("generate_report", self._generate_report)
workflow.set_entry_point("plan_queries")
workflow.add_edge("plan_queries", "search")
workflow.add_edge("search", "synthesize")
workflow.add_edge("synthesize", "evaluate_quality")
workflow.add_conditional_edges(
"evaluate_quality",
self._quality_router,
{
"retry": "plan_queries",
"pass": "generate_report",
"max_retries": END
}
)
workflow.add_edge("generate_report", END)
return workflow.compile()
async def _plan_queries(self, state: ResearchState) -> ResearchState:
prompt = ChatPromptTemplate.from_template(
"قسّم استعلام البحث هذا إلى 3-5 استعلامات فرعية محددة "
"تغطي معاً الموضوع بشكل شامل.\n\nالاستعلام: {query}"
)
response = await self.llm.ainvoke(prompt.format_messages(query=state["query"]))
sub_queries = self._parse_sub_queries(response.content)
return {**state, "sub_queries": sub_queries}
async def _parallel_search(self, state: ResearchState) -> ResearchState:
tasks = [self._search_one(q) for q in state["sub_queries"]]
results = await asyncio.gather(*tasks, return_exceptions=True)
flat_results = []
for r in results:
if isinstance(r, list):
flat_results.extend(r)
return {**state, "search_results": flat_results}
async def _synthesize(self, state: ResearchState) -> ResearchState:
context = self._format_results(state["search_results"])
prompt = ChatPromptTemplate.from_template(
"لخّص نتائج البحث التالية في ملخص متماسك وحقائقي للاستعلام: {query}\n\nالمصادر:\n{context}"
)
response = await self.llm.ainvoke(
prompt.format_messages(query=state["query"], context=context)
)
return {**state, "synthesized_content": response.content}
async def _evaluate_quality(self, state: ResearchState) -> ResearchState:
prompt = ChatPromptTemplate.from_template(
"قيّم جودة هذا التلخيص البحثي على مقياس من 0.0 إلى 1.0. "
"اعتبر: الدقة الحقائقية، والاكتمال، وتنوع المصادر، والوضوح.\n\n"
"الاستعلام: {query}\n\nالتلخيص: {synthesis}\n\n"
"أجب بالنقاط فقط، مثلاً: 0.85"
)
response = await self.llm.ainvoke(
prompt.format_messages(
query=state["query"],
synthesis=state["synthesized_content"]
)
)
try:
score = float(response.content.strip())
except ValueError:
score = 0.5
return {**state, "quality_score": score}
def _quality_router(self, state: ResearchState) -> str:
if state["iteration"] >= self.max_iterations:
return "max_retries"
if state["quality_score"] >= self.quality_threshold:
return "pass"
return "retry"
async def _generate_report(self, state: ResearchState) -> ResearchState:
prompt = ChatPromptTemplate.from_template(
"أنشئ تقرير بحثي شاملاً بناءً على هذا التلخيص.\n\n"
"الاستعلام: {query}\n\nالتلخيص: {synthesis}"
)
response = await self.llm.ainvoke(
prompt.format_messages(
query=state["query"],
synthesis=state["synthesized_content"]
)
)
return {**state, "final_report": response.content}
def _parse_sub_queries(self, content: str) -> List[str]:
lines = [l.strip() for l in content.split("\n") if l.strip()]
return [l.lstrip("0123456789.-) ") for l in lines if len(l) > 10][:5]
def _format_results(self, results: List[dict]) -> str:
return "\n\n".join(
f"المصدر: {r.get('url', 'غير معروف')}\n{r.get('content', '')}"
for r in results[:10]
)
async def research(self, query: str) -> str:
initial_state = ResearchState(
query=query,
sub_queries=[],
search_results=[],
synthesized_content="",
quality_score=0.0,
final_report=None,
iteration=0
)
final_state = await self.graph.ainvoke(initial_state)
return final_state.get("final_report", "تعذّر إكمال البحث.")
نظام البحث متعدد الوكلاء
from dataclasses import dataclass, field
import asyncio
@dataclass
class ResearchTask:
task_id: str
topic: str
depth: str = "شامل"
domains: List[str] = field(default_factory=list)
class MultiAgentResearchSystem:
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.agents: dict = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: dict = {}
def _get_or_create_agent(self, domain: str) -> QualityFocusedAgent:
if domain not in self.agents:
self.agents[domain] = QualityFocusedAgent(
model="claude-3-7-sonnet-latest",
quality_threshold=0.82
)
return self.agents[domain]
async def _worker(self, worker_id: int):
while True:
task: ResearchTask = await self.task_queue.get()
try:
domain = task.domains[0] if task.domains else "عام"
agent = self._get_or_create_agent(domain)
result = await agent.research(task.topic)
self.results[task.task_id] = result
except Exception as e:
self.results[task.task_id] = f"خطأ: {e}"
finally:
self.task_queue.task_done()
async def run(self, tasks: List[ResearchTask]) -> dict:
workers = [
asyncio.create_task(self._worker(i))
for i in range(self.num_workers)
]
for task in tasks:
await self.task_queue.put(task)
await self.task_queue.join()
for w in workers:
w.cancel()
return self.results
نظام RAG المتقدم
from langchain_community.vectorstores import Chroma
from langchain_anthropic import ChatAnthropic
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from typing import List
class AdvancedRAGSystem:
def __init__(
self,
vector_store_path: str = "./chroma_db",
top_k: int = 10,
rerank_top_k: int = 4
):
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.llm = ChatAnthropic(model="claude-3-7-sonnet-latest")
self.top_k = top_k
self.rerank_top_k = rerank_top_k
self.vector_store = Chroma(
persist_directory=vector_store_path,
embedding_function=self.embeddings
)
def add_documents(self, docs: List[Document]) -> None:
self.vector_store.add_documents(docs)
async def query(self, question: str) -> dict:
retriever = self.vector_store.as_retriever(
search_kwargs={"k": self.top_k}
)
docs = retriever.get_relevant_documents(question)
context = "\n\n".join(d.page_content for d in docs[:self.rerank_top_k])
prompt = (
f"أجب على السؤال التالي مستخدماً السياق المقدم فقط.\n\n"
f"السؤال: {question}\n\nالسياق:\n{context}"
)
response = await self.llm.ainvoke(prompt)
return {
"answer": response.content,
"sources": [d.metadata.get("source", "غير معروف") for d in docs[:self.rerank_top_k]],
"num_sources": min(len(docs), self.rerank_top_k)
}
الوكلاء الخاصة بالمجالات
وكيل البحث المالي
class FinancialResearchAgent(QualityFocusedAgent):
def __init__(self):
super().__init__(
model="claude-3-7-sonnet-latest",
quality_threshold=0.88,
max_iterations=4
)
self.financial_prompt_suffix = (
"\n\nمهم: هذا بحث مالي. "
"ميّز بوضوح بين الحقائق والتوقعات. "
"أدرج عوامل المخاطر ذات الصلة. "
"لا تقدم توصيات استثمارية. "
"استشهد بجميع البيانات الكمية مع مصدرها وتاريخها."
)
async def research_company(self, ticker: str, aspects: List[str] = None) -> dict:
if aspects is None:
aspects = ["نموذج الأعمال", "المالية", "الوضع التنافسي", "المخاطر"]
tasks = [
self.research(f"{ticker} {aspect}{self.financial_prompt_suffix}")
for aspect in aspects
]
results = await asyncio.gather(*tasks)
return {
"ticker": ticker,
"sections": dict(zip(aspects, results)),
"generated_at": "2025-07-17"
}
وكيل البحث الطبي
class MedicalResearchAgent(QualityFocusedAgent):
def __init__(self):
super().__init__(
model="claude-3-7-sonnet-latest",
quality_threshold=0.92,
max_iterations=5
)
self.disclaimer = (
"\n\nتنبيه: هذا لأغراض إعلامية فقط ولا يُشكّل نصيحة طبية. "
"استشر متخصصي الرعاية الصحية المؤهلين للقرارات الطبية."
)
async def research_condition(self, condition: str) -> dict:
sections = {
"نظرة عامة": f"نظرة عامة وانتشار {condition}",
"الفيزيولوجيا المرضية": f"الفيزيولوجيا المرضية وآليات {condition}",
"التشخيص": f"معايير وأساليب تشخيص {condition}",
"العلاج": f"العلاجات الحالية المبنية على الأدلة لـ {condition}",
"البحث": f"التجارب السريرية الحديثة والتطورات البحثية في {condition}"
}
tasks = [self.research(q) for q in sections.values()]
results = await asyncio.gather(*tasks)
return {
"condition": condition,
"sections": dict(zip(sections.keys(), results)),
"disclaimer": self.disclaimer.strip()
}
التكامل الإنتاجي مع Slack
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.errors import SlackApiError
class ResearchNotificationService:
def __init__(self, slack_token: str, default_channel: str):
self.slack = AsyncWebClient(token=slack_token)
self.default_channel = default_channel
async def post_research_complete(
self,
channel: str,
topic: str,
report: str,
quality_score: float,
thread_ts: str = None
):
summary = report[:500] + "..." if len(report) > 500 else report
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": f"اكتمل البحث: {topic[:50]}"}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*درجة الجودة:* {quality_score:.0%}\n\n{summary}"
}
}
]
try:
response = await self.slack.chat_postMessage(
channel=channel or self.default_channel,
blocks=blocks,
thread_ts=thread_ts
)
if len(report) > 500:
await self.slack.chat_postMessage(
channel=channel or self.default_channel,
text=f"*التقرير الكامل:*\n\n{report}",
thread_ts=response["ts"]
)
except SlackApiError as e:
print(f"خطأ Slack: {e.response['error']}")
النشر على Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: open-deep-research
namespace: ai-research
spec:
replicas: 3
selector:
matchLabels:
app: open-deep-research
template:
metadata:
labels:
app: open-deep-research
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
containers:
- name: research-service
image: your-registry/open-deep-research:latest
ports:
- containerPort: 8000
name: http
- containerPort: 9090
name: metrics
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: ai-keys
key: anthropic
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: ai-keys
key: openai
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 15
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: research-hpa
namespace: ai-research
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: open-deep-research
minReplicas: 2
maxReplicas: 15
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 65
طبقة خدمة FastAPI
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
app = FastAPI(title="Open Deep Research API")
research_system = MultiAgentResearchSystem(num_workers=4)
task_store: dict = {}
class ResearchRequest(BaseModel):
topic: str
domain: str = "عام"
depth: str = "شامل"
@app.post("/research")
async def submit_research(
request: ResearchRequest,
background_tasks: BackgroundTasks
):
task_id = str(uuid4())
task_store[task_id] = {"status": "في قائمة الانتظار", "result": None}
background_tasks.add_task(
_run_research, task_id, request.topic, request.domain
)
return {"task_id": task_id, "status": "في قائمة الانتظار"}
async def _run_research(task_id: str, topic: str, domain: str):
task_store[task_id]["status"] = "قيد التشغيل"
try:
task = ResearchTask(task_id=task_id, topic=topic, domains=[domain])
results = await research_system.run([task])
task_store[task_id]["status"] = "مكتمل"
task_store[task_id]["result"] = results.get(task_id)
except Exception as e:
task_store[task_id]["status"] = "فشل"
task_store[task_id]["error"] = str(e)
@app.get("/research/{task_id}")
async def get_research(task_id: str):
if task_id not in task_store:
raise HTTPException(status_code=404, detail="المهمة غير موجودة")
return task_store[task_id]
@app.get("/health")
async def health():
return {"status": "ok"}
الخلاصة والتوصيات
يوفر LangChain Open Deep Research أساساً متيناً لأتمتة مهام البحث المعقدة متعددة الخطوات. المعمارية معيارية بما يكفي للتكيف مع مجالات متنوعة مع الحفاظ على معايير الجودة من خلال التحسين التكراري.
متى تستخدمه:
- موضوعات البحث التي تتطلب توليف المعلومات من مصادر عديدة
- سير عمل البحث المتكررة (الاستخبارات التنافسية، مراجعات الأدبيات)
- الفرق التي تحتاج إلى مخرجات بحثية منظمة ومستشهداً بها على نطاق واسع
قرارات معمارية رئيسية:
- استخدم
QualityFocusedAgentكأساس لجميع الوكلاء الخاصة بالمجالات - اضبط عتبات الجودة بشكل محافظ (0.85+) للمخرجات العامة
- انشر النظام متعدد الوكلاء خلف قائمة انتظار مهام للأحمال الكبيرة
- أضف مقاييس Prometheus من اليوم الأول لتتبع تكاليف الرموز وأنماط الجودة
المراجع: