개요

Water프레임워크에 구애받지 않는 멀티 에이전트 오케스트레이션 프레임워크입니다. LangChain, CrewAI, Agno 등 기존 에이전트 프레임워크나 커스텀 구현체를 그대로 활용하면서, 복잡한 멀티 에이전트 워크플로우를 조율할 수 있는 상위 레이어를 제공합니다. 프로덕션 환경에서 사용할 수 있도록 설계되었으며, FastAPI 기반 플레이그라운드를 통해 정의한 플로우를 즉시 테스트할 수 있습니다. [GitHub Repo]

1. 핵심 특징

특징 설명
Framework Agnostic 모든 에이전트 프레임워크 및 커스텀 구현체 통합 가능
Flexible Workflows Python으로 복잡한 멀티 에이전트 상호작용 정의
Production Ready 프로덕션 환경에서 사용 가능한 안정성
Playground FastAPI 서버로 정의한 플로우 즉시 테스트
Type Safety Pydantic 스키마 기반 입출력 검증

2. 설치 및 기본 설정

pip install water-ai

3. 기본 사용법

3.1 단순 태스크 정의

import asyncio
from water import Flow, create_task
from pydantic import BaseModel

class NumberInput(BaseModel):
    value: int

class NumberOutput(BaseModel):
    result: int

def add_five(params, context):
    return {"result": params["input_data"]["value"] + 5}

# 태스크 생성
math_task = create_task(
    id="math_task",
    description="Add 5 to input number",
    input_schema=NumberInput,
    output_schema=NumberOutput,
    execute=add_five
)

# 플로우 정의 및 등록
flow = Flow(id="simple_math", description="Simple math flow").then(math_task).register()

async def main():
    result = await flow.run({"value": 10})
    print(result)  # {"result": 15}

if __name__ == "__main__":
    asyncio.run(main())

3.2 복합 워크플로우

from water import Flow, create_task
from pydantic import BaseModel

class TextInput(BaseModel):
    text: str

class AnalysisOutput(BaseModel):
    sentiment: str
    keywords: list[str]

class SummaryOutput(BaseModel):
    summary: str

def analyze_text(params, context):
    text = params["input_data"]["text"]
    # 실제로는 LLM API 호출
    return {
        "sentiment": "positive",
        "keywords": ["AI", "framework", "orchestration"]
    }

def summarize_text(params, context):
    text = params["input_data"]["text"]
    analysis = context.get("analysis_result", {})
    # 분석 결과를 활용한 요약
    return {
        "summary": f"Text with {analysis.get('sentiment', 'unknown')} sentiment"
    }

# 태스크들 정의
analysis_task = create_task(
    id="analysis",
    description="Analyze text sentiment and extract keywords",
    input_schema=TextInput,
    output_schema=AnalysisOutput,
    execute=analyze_text
)

summary_task = create_task(
    id="summary",
    description="Summarize text based on analysis",
    input_schema=TextInput,
    output_schema=SummaryOutput,
    execute=summarize_text
)

# 순차 실행 플로우
sequential_flow = Flow(
    id="text_processing",
    description="Analyze and summarize text"
).then(analysis_task).then(summary_task).register()

# 병렬 실행 플로우
parallel_flow = Flow(
    id="parallel_processing",
    description="Parallel text processing"
).parallel([analysis_task, summary_task]).register()

4. 기존 프레임워크 통합

4.1 LangChain 에이전트 통합

from langchain.agents import create_openai_functions_agent
from langchain.tools import DuckDuckGoSearchRun
from water import create_task

def langchain_agent_wrapper(params, context):
    # LangChain 에이전트 초기화
    search = DuckDuckGoSearchRun()
    agent = create_openai_functions_agent(
        llm=llm,
        tools=[search],
        prompt=prompt
    )
    
    # 에이전트 실행
    result = agent.invoke({"input": params["input_data"]["query"]})
    return {"response": result["output"]}

langchain_task = create_task(
    id="langchain_search",
    description="Search using LangChain agent",
    input_schema=QueryInput,
    output_schema=ResponseOutput,
    execute=langchain_agent_wrapper
)

4.2 CrewAI 통합

from crewai import Agent, Task, Crew
from water import create_task

def crewai_wrapper(params, context):
    # CrewAI 에이전트 정의
    researcher = Agent(
        role='Researcher',
        goal='Research the given topic',
        backstory='Expert researcher',
        verbose=True
    )
    
    task = Task(
        description=f"Research: {params['input_data']['topic']}",
        agent=researcher
    )
    
    crew = Crew(agents=[researcher], tasks=[task])
    result = crew.kickoff()
    
    return {"research_result": str(result)}

crewai_task = create_task(
    id="crewai_research",
    description="Research using CrewAI",
    input_schema=TopicInput,
    output_schema=ResearchOutput,
    execute=crewai_wrapper
)

5. 고급 워크플로우 패턴

5.1 조건부 실행

def conditional_router(params, context):
    sentiment = context.get("analysis_result", {}).get("sentiment")
    
    if sentiment == "negative":
        return "escalation_flow"
    elif sentiment == "positive":
        return "standard_flow"
    else:
        return "neutral_flow"

conditional_flow = Flow(
    id="conditional_processing",
    description="Route based on sentiment"
).then(analysis_task).route(conditional_router, {
    "escalation_flow": escalation_task,
    "standard_flow": standard_task,
    "neutral_flow": neutral_task
}).register()

5.2 루프 및 재시도

def retry_wrapper(params, context):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            result = risky_operation(params)
            return result
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            continue

retry_task = create_task(
    id="retry_task",
    description="Task with retry logic",
    input_schema=InputSchema,
    output_schema=OutputSchema,
    execute=retry_wrapper
)

6. FastAPI 플레이그라운드

from water.playground import create_playground

# 모든 등록된 플로우를 포함한 FastAPI 앱 생성
app = create_playground()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

플레이그라운드 실행 후 http://localhost:8000/docs에서 Swagger UI를 통해 정의한 플로우들을 테스트할 수 있습니다.

7. 컨텍스트 및 상태 관리

def stateful_task(params, context):
    # 이전 태스크 결과 접근
    previous_result = context.get("previous_task_id")
    
    # 글로벌 상태 업데이트
    context.set("current_step", "processing")
    
    # 세션 데이터 저장
    context.session["user_preferences"] = params["preferences"]
    
    return {"status": "completed"}

8. 에러 핸들링 및 모니터링

def monitored_task(params, context):
    try:
        # 태스크 실행 시간 측정
        start_time = time.time()
        result = expensive_operation(params)
        execution_time = time.time() - start_time
        
        # 메트릭 로깅
        context.log_metric("execution_time", execution_time)
        context.log_metric("success", 1)
        
        return result
    except Exception as e:
        context.log_metric("error", 1)
        context.log_error(str(e))
        raise

9. 프로덕션 배포 예제

from water import Flow, create_task
from water.middleware import AuthMiddleware, RateLimitMiddleware

# 미들웨어 설정
auth_middleware = AuthMiddleware(api_key="your-api-key")
rate_limit_middleware = RateLimitMiddleware(requests_per_minute=100)

# 프로덕션 플로우
production_flow = Flow(
    id="production_workflow",
    description="Production-ready workflow",
    middlewares=[auth_middleware, rate_limit_middleware]
).then(preprocessing_task).parallel([
    analysis_task,
    enrichment_task
]).then(aggregation_task).register()

# 배포
app = create_playground(
    title="Production API",
    version="1.0.0",
    enable_cors=True,
    enable_metrics=True
)

10. 로드맵 및 향후 계획

Water 프레임워크는 다음 기능들을 개발 중입니다:

  • Storage Layer: 플로우 세션 및 태스크 실행 결과 저장
  • Human in the Loop: 인간 개입이 필요한 워크플로우 지원
  • Retry Mechanism: 개별 태스크 재시도 메커니즘
  • Visual Flow Builder: 웹 기반 플로우 설계 도구

11. 결론

Water는 멀티 에이전트 시스템의 복잡성을 해결하면서도 기존 투자를 보호할 수 있는 혁신적인 오케스트레이션 프레임워크입니다. 프레임워크에 구애받지 않는 설계로 인해 LangChain, CrewAI 등 기존 도구들을 그대로 활용하면서도 더 복잡하고 확장 가능한 워크플로우를 구축할 수 있습니다. 프로덕션 환경에서의 안정성과 개발자 경험을 모두 고려한 설계로, 엔터프라이즈급 멀티 에이전트 시스템 구축에 적합합니다.

프로젝트 소스코드와 더 많은 예제는 GitHub에서 확인하세요: manthanguptaa/water [GitHub Repo]