⏱️ 예상 읽기 시간: 22분

서론

SpiceAI는 Apache-2.0 라이선스 하에 배포되는 오픈소스 AI 인프라 플랫폼으로, Rust로 작성된 고성능 SQL 쿼리, 검색, LLM 추론 엔진입니다. Private 클라우드 환경에서 AI 플랫폼을 구축하는 팀들에게 데이터 통합부터 AI 추론까지 통합된 솔루션을 제공합니다.

특히 엔터프라이즈 환경에서 분산된 데이터 소스들을 통합하고, 실시간 ML 추론을 수행하며, RAG(Retrieval-Augmented Generation) 시스템을 구축하는데 최적화되어 있습니다. 이 가이드에서는 private 클라우드 회사의 AI 플랫폼 팀이 SpiceAI를 어떻게 활용할 수 있는지 실제 사례와 함께 상세히 다루겠습니다.

📄 라이선스 분석 및 엔터프라이즈 적합성

Apache-2.0 라이선스 개요

SpiceAI는 Apache License 2.0을 사용하며, 이는 상업적 활용에 매우 관대한 라이선스입니다.

엔터프라이즈 사용 가능성 매트릭스

사용 시나리오 허용 여부 소스코드 공개 의무 추가 조건
Private 클라우드 배포 완전 허용 불필요 없음
고객사 서비스 제공 완전 허용 불필요 라이선스 고지만 필요
수정 후 내부 배포 완전 허용 불필요 라이선스 고지만 필요
클라우드 서비스 상품화 완전 허용 불필요 라이선스 고지만 필요
임베디드 제품 포함 완전 허용 불필요 라이선스 고지만 필요

🏢 엔터프라이즈 권장 사용 방식

권장사항:
  상업적_사용: "완전 자유 - 제약 없음"
  수정_배포: "자유롭게 가능"
  특허_보호: "Apache-2.0 특허 보호 조항 적용"
  엔터프라이즈_지원: "커뮤니티  상용 지원 모두 가능"
  
준수사항:
  라이선스_고지: "배포  LICENSE 파일 포함"
  저작권_표시: "원본 저작권 고지 유지"
  변경사항_표시: "수정 사항이 있다면 NOTICE 파일에 기록"

결론: Private 클라우드 회사에서 완전히 자유롭게 상업적 활용 가능

🎯 Private 클라우드 AI 플랫폼 핵심 활용 사례

1. 데이터 페더레이션 및 통합 플랫폼

문제 상황

- 여러 데이터베이스에 분산된 고객 데이터
- 실시간 분석을 위한 데이터 통합 필요
- 각 시스템별로 다른 접근 방식과 API
- 데이터 이동 없이 쿼리 수행 요구사항

SpiceAI 솔루션

// spicepod.yml - 데이터 페더레이션 설정
version: v1beta1
kind: Spicepod
metadata:
  name: enterprise-data-federation

datasets:
  # PostgreSQL 고객 데이터
  - from: postgresql://user:pass@customer-db:5432/customers
    name: customers
    description: "Customer master data"
    params:
      sql: |
        SELECT customer_id, name, email, created_at, tier
        FROM customers 
        WHERE active = true
  
  # Snowflake 분석 데이터
  - from: snowflake://account.region/warehouse/database/schema
    name: analytics_data
    description: "Customer analytics and behavior data"
    params:
      sql: |
        SELECT customer_id, event_type, event_time, value
        FROM customer_events
        WHERE event_time >= NOW() - INTERVAL '30 days'
  
  # S3 로그 데이터
  - from: s3://company-logs/user-behavior/
    name: user_logs
    description: "User behavior logs from applications"
    params:
      file_format: parquet
      
  # DynamoDB 실시간 세션 데이터
  - from: dynamodb://us-west-2/user-sessions
    name: active_sessions
    description: "Real-time user session data"

# 데이터 가속화 설정
models:
  - from: duckdb
    name: customer_360_view
    description: "Unified customer 360 view"
    datasets:
      - customers
      - analytics_data
      - user_logs
      - active_sessions

실제 구현 예제

# enterprise_data_platform.py
import spicepy
from datetime import datetime, timedelta
import asyncio

class EnterpriseDataPlatform:
    def __init__(self):
        self.client = spicepy.Client()
        
    async def get_customer_360_view(self, customer_id: str):
        """통합 고객 360도 뷰 생성"""
        
        query = f"""
        WITH customer_base AS (
            SELECT customer_id, name, email, tier, created_at
            FROM customers
            WHERE customer_id = '{customer_id}'
        ),
        recent_activity AS (
            SELECT 
                customer_id,
                COUNT(*) as event_count,
                MAX(event_time) as last_activity,
                SUM(CASE WHEN event_type = 'purchase' THEN value ELSE 0 END) as total_purchases
            FROM analytics_data
            WHERE customer_id = '{customer_id}'
            GROUP BY customer_id
        ),
        session_info AS (
            SELECT 
                customer_id,
                COUNT(*) as active_sessions,
                MAX(last_seen) as last_session
            FROM active_sessions
            WHERE customer_id = '{customer_id}'
            GROUP BY customer_id
        )
        SELECT 
            cb.customer_id,
            cb.name,
            cb.email,
            cb.tier,
            cb.created_at,
            COALESCE(ra.event_count, 0) as total_events,
            ra.last_activity,
            COALESCE(ra.total_purchases, 0) as lifetime_value,
            COALESCE(si.active_sessions, 0) as current_sessions,
            si.last_session
        FROM customer_base cb
        LEFT JOIN recent_activity ra ON cb.customer_id = ra.customer_id
        LEFT JOIN session_info si ON cb.customer_id = si.customer_id
        """
        
        result = await self.client.query(query)
        return result.to_dict()
    
    async def get_real_time_analytics(self, time_window_hours: int = 1):
        """실시간 분석 대시보드 데이터"""
        
        query = f"""
        SELECT 
            DATE_TRUNC('minute', event_time) as time_bucket,
            event_type,
            COUNT(*) as event_count,
            COUNT(DISTINCT customer_id) as unique_users,
            AVG(value) as avg_value,
            SUM(value) as total_value
        FROM analytics_data
        WHERE event_time >= NOW() - INTERVAL '{time_window_hours} hours'
        GROUP BY time_bucket, event_type
        ORDER BY time_bucket DESC, event_count DESC
        """
        
        return await self.client.query(query)
    
    async def detect_anomalies(self):
        """실시간 이상 탐지"""
        
        query = """
        WITH hourly_metrics AS (
            SELECT 
                DATE_TRUNC('hour', event_time) as hour,
                COUNT(*) as total_events,
                COUNT(DISTINCT customer_id) as unique_users,
                AVG(value) as avg_transaction_value
            FROM analytics_data
            WHERE event_time >= NOW() - INTERVAL '24 hours'
            GROUP BY hour
        ),
        stats AS (
            SELECT 
                AVG(total_events) as avg_events,
                STDDEV(total_events) as stddev_events,
                AVG(unique_users) as avg_users,
                STDDEV(unique_users) as stddev_users
            FROM hourly_metrics
            WHERE hour < DATE_TRUNC('hour', NOW())
        )
        SELECT 
            hm.hour,
            hm.total_events,
            hm.unique_users,
            hm.avg_transaction_value,
            CASE 
                WHEN ABS(hm.total_events - s.avg_events) > 2 * s.stddev_events 
                THEN 'ANOMALY_EVENTS'
                WHEN ABS(hm.unique_users - s.avg_users) > 2 * s.stddev_users 
                THEN 'ANOMALY_USERS'
                ELSE 'NORMAL'
            END as anomaly_status
        FROM hourly_metrics hm
        CROSS JOIN stats s
        WHERE hm.hour = DATE_TRUNC('hour', NOW()) - INTERVAL '1 hour'
        """
        
        return await self.client.query(query)

# 사용 예제
async def main():
    platform = EnterpriseDataPlatform()
    
    # 고객 360도 뷰
    customer_view = await platform.get_customer_360_view("customer_12345")
    print(f"Customer 360 View: {customer_view}")
    
    # 실시간 분석
    analytics = await platform.get_real_time_analytics(6)  # 최근 6시간
    print(f"Real-time Analytics: {analytics}")
    
    # 이상 탐지
    anomalies = await platform.detect_anomalies()
    print(f"Anomaly Detection: {anomalies}")

if __name__ == "__main__":
    asyncio.run(main())

2. 실시간 ML 추론 플랫폼

문제 상황

- 다양한 ML 모델들의 분산 배포
- 실시간 특성 데이터 수집 및 처리
- 모델 서빙 인프라의 복잡성
- A/B 테스트 및 모델 버전 관리

SpiceAI 솔루션

# spicepod.yml - ML 추론 플랫폼 설정
version: v1beta1
kind: Spicepod
metadata:
  name: ml-inference-platform

datasets:
  # 실시간 특성 데이터
  - from: kafka://kafka-cluster:9092/feature-stream
    name: real_time_features
    description: "Real-time feature data from Kafka"
    params:
      topic: feature-stream
      consumer_group: spice-ml-platform
      
  # 배치 특성 저장소
  - from: postgresql://features-db:5432/feature_store
    name: batch_features
    description: "Batch processed features"
    
  # 모델 메타데이터
  - from: postgresql://model-registry:5432/models
    name: model_registry
    description: "ML model registry and metadata"

models:
  # 추천 모델
  - from: huggingface://company/recommendation-model-v2
    name: recommendation_model
    description: "Product recommendation model"
    
  # 이상 탐지 모델
  - from: local://models/anomaly-detection.onnx
    name: anomaly_detector
    description: "Transaction anomaly detection"
    
  # 감정 분석 모델
  - from: openai://gpt-4
    name: sentiment_analyzer
    description: "Customer feedback sentiment analysis"

실시간 추론 시스템 구현

# ml_inference_platform.py
import spicepy
import asyncio
from typing import Dict, List, Any
import json
from datetime import datetime

class MLInferencePlatform:
    def __init__(self):
        self.client = spicepy.Client()
        self.model_cache = {}
        
    async def get_real_time_features(self, user_id: str) -> Dict[str, Any]:
        """실시간 특성 데이터 수집"""
        
        query = f"""
        WITH recent_activity AS (
            SELECT 
                user_id,
                AVG(session_duration) as avg_session_duration,
                COUNT(*) as activity_count,
                MAX(last_action_time) as last_activity
            FROM real_time_features
            WHERE user_id = '{user_id}' 
                AND timestamp >= NOW() - INTERVAL '1 hour'
            GROUP BY user_id
        ),
        batch_profile AS (
            SELECT 
                user_id,
                age_group,
                preferred_category,
                purchase_frequency,
                average_order_value
            FROM batch_features
            WHERE user_id = '{user_id}'
        )
        SELECT 
            ra.user_id,
            ra.avg_session_duration,
            ra.activity_count,
            ra.last_activity,
            bp.age_group,
            bp.preferred_category,
            bp.purchase_frequency,
            bp.average_order_value
        FROM recent_activity ra
        LEFT JOIN batch_profile bp ON ra.user_id = bp.user_id
        """
        
        result = await self.client.query(query)
        return result.to_dict() if result else {}
    
    async def predict_recommendations(self, user_id: str, num_recommendations: int = 5) -> List[Dict]:
        """상품 추천 예측"""
        
        # 특성 데이터 수집
        features = await self.get_real_time_features(user_id)
        
        if not features:
            return []
        
        # SpiceAI ML 모델을 통한 추론
        ml_query = f"""
        SELECT 
            product_id,
            product_name,
            category,
            predicted_score,
            confidence
        FROM ML.PREDICT(
            MODEL recommendation_model,
            SELECT 
                '{user_id}' as user_id,
                {features.get('avg_session_duration', 0)} as session_duration,
                {features.get('activity_count', 0)} as recent_activity,
                '{features.get('age_group', 'unknown')}' as age_group,
                '{features.get('preferred_category', 'general')}' as preferred_category,
                {features.get('purchase_frequency', 0)} as purchase_frequency,
                {features.get('average_order_value', 0)} as avg_order_value
        )
        ORDER BY predicted_score DESC
        LIMIT {num_recommendations}
        """
        
        result = await self.client.query(ml_query)
        return result.to_list()
    
    async def detect_anomalies(self, transaction_data: Dict[str, Any]) -> Dict[str, Any]:
        """거래 이상 탐지"""
        
        anomaly_query = f"""
        SELECT 
            anomaly_score,
            is_anomaly,
            anomaly_reasons,
            confidence
        FROM ML.PREDICT(
            MODEL anomaly_detector,
            SELECT 
                {transaction_data['amount']} as amount,
                '{transaction_data['merchant_category']}' as merchant_category,
                {transaction_data['time_since_last_transaction']} as time_since_last,
                '{transaction_data['location']}' as location,
                {transaction_data['user_risk_score']} as user_risk_score
        )
        """
        
        result = await self.client.query(anomaly_query)
        return result.to_dict()
    
    async def analyze_sentiment(self, feedback_text: str) -> Dict[str, Any]:
        """고객 피드백 감정 분석"""
        
        sentiment_query = f"""
        SELECT 
            sentiment_label,
            sentiment_score,
            key_phrases,
            emotion_categories
        FROM ML.CHAT(
            MODEL sentiment_analyzer,
            'Analyze the sentiment and extract key insights from this customer feedback: "{feedback_text}"'
        )
        """
        
        result = await self.client.query(sentiment_query)
        return result.to_dict()
    
    async def run_ab_test(self, user_id: str, experiment_name: str) -> Dict[str, Any]:
        """A/B 테스트 모델 선택 및 추론"""
        
        # 실험 설정 조회
        experiment_query = f"""
        SELECT 
            model_variant,
            traffic_allocation,
            experiment_status
        FROM model_registry
        WHERE experiment_name = '{experiment_name}'
            AND experiment_status = 'active'
        """
        
        experiments = await self.client.query(experiment_query)
        
        # 사용자를 실험 그룹에 할당
        user_hash = hash(user_id) % 100
        cumulative_allocation = 0
        
        for exp in experiments.to_list():
            cumulative_allocation += exp['traffic_allocation']
            if user_hash < cumulative_allocation:
                selected_model = exp['model_variant']
                break
        else:
            selected_model = 'control'
        
        # 선택된 모델로 추론 수행
        if selected_model == 'recommendation_model_v2':
            return await self.predict_recommendations(user_id)
        else:
            return await self.predict_recommendations(user_id)  # 기본 모델

# 사용 예제
async def main():
    ml_platform = MLInferencePlatform()
    
    # 실시간 추천
    recommendations = await ml_platform.predict_recommendations("user_67890", 10)
    print(f"Recommendations: {recommendations}")
    
    # 이상 탐지
    transaction = {
        'amount': 5000,
        'merchant_category': 'electronics',
        'time_since_last_transaction': 120,  # 분
        'location': 'unusual_location',
        'user_risk_score': 0.7
    }
    anomaly_result = await ml_platform.detect_anomalies(transaction)
    print(f"Anomaly Detection: {anomaly_result}")
    
    # 감정 분석
    sentiment = await ml_platform.analyze_sentiment(
        "The product quality is excellent but delivery was delayed."
    )
    print(f"Sentiment Analysis: {sentiment}")

if __name__ == "__main__":
    asyncio.run(main())

3. 엔터프라이즈 RAG 시스템

문제 상황

- 분산된 문서와 지식 베이스
- 다양한 형태의 비구조화 데이터
- 컨텍스트를 고려한 정확한 정보 검색
- 실시간 지식 업데이트 및 벡터 인덱싱

SpiceAI RAG 시스템 구현

# spicepod.yml - Enterprise RAG 설정
version: v1beta1
kind: Spicepod
metadata:
  name: enterprise-rag-system

datasets:
  # 문서 저장소
  - from: s3://company-docs/knowledge-base/
    name: knowledge_documents
    description: "Enterprise knowledge base documents"
    params:
      file_format: pdf
      
  # 위키/컨플루언스
  - from: https://company.atlassian.net/wiki/api/v2/
    name: confluence_docs
    description: "Confluence documentation"
    
  # 이메일 아카이브
  - from: postgresql://email-archive:5432/emails
    name: email_archive
    description: "Corporate email archive"
    
  # 실시간 채팅/슬랙
  - from: slack://workspace/channels
    name: slack_messages
    description: "Slack conversation history"

# 벡터 검색 및 임베딩 설정
models:
  - from: huggingface://sentence-transformers/all-MiniLM-L6-v2
    name: embedding_model
    description: "Document embedding model"
    
  - from: openai://gpt-4
    name: generation_model
    description: "Answer generation model"
    
  - from: chromadb://localhost:8000/enterprise-vectors
    name: vector_store
    description: "Vector database for semantic search"

RAG 시스템 구현

# enterprise_rag_system.py
import spicepy
import asyncio
from typing import List, Dict, Any
import json
from datetime import datetime

class EnterpriseRAGSystem:
    def __init__(self):
        self.client = spicepy.Client()
        
    async def index_documents(self, batch_size: int = 100):
        """문서 벡터화 및 인덱싱"""
        
        # 새로운 문서 또는 업데이트된 문서 조회
        new_docs_query = """
        SELECT 
            doc_id,
            title,
            content,
            document_type,
            last_modified,
            department,
            access_level
        FROM knowledge_documents
        WHERE last_indexed IS NULL 
           OR last_modified > last_indexed
        ORDER BY last_modified DESC
        LIMIT {batch_size}
        """.format(batch_size=batch_size)
        
        docs = await self.client.query(new_docs_query)
        
        for doc in docs.to_list():
            # 문서 임베딩 생성
            embedding_query = f"""
            SELECT 
                doc_id,
                ML.EMBED(
                    MODEL embedding_model,
                    CONCAT(title, ' ', content)
                ) as embedding
            FROM (
                SELECT 
                    '{doc['doc_id']}' as doc_id,
                    '{doc['title']}' as title,
                    '{doc['content'][:2000]}' as content
            ) AS doc_data
            """
            
            embedding_result = await self.client.query(embedding_query)
            
            # 벡터 저장소에 저장
            if embedding_result:
                await self.store_vector(doc, embedding_result.to_dict()['embedding'])
                
        print(f"Indexed {len(docs.to_list())} documents")
    
    async def store_vector(self, document: Dict, embedding: List[float]):
        """벡터 저장소에 문서 임베딩 저장"""
        
        store_query = f"""
        INSERT INTO vector_store (
            doc_id,
            title,
            content_preview,
            embedding,
            metadata
        ) VALUES (
            '{document['doc_id']}',
            '{document['title']}',
            '{document['content'][:200]}...',
            {embedding},
            '{json.dumps({
                'document_type': document['document_type'],
                'department': document['department'],
                'access_level': document['access_level'],
                'last_modified': document['last_modified'].isoformat()
            })}'
        )
        ON CONFLICT (doc_id) 
        DO UPDATE SET 
            embedding = EXCLUDED.embedding,
            metadata = EXCLUDED.metadata,
            updated_at = NOW()
        """
        
        await self.client.query(store_query)
    
    async def semantic_search(self, query: str, user_access_level: str, top_k: int = 5) -> List[Dict]:
        """의미적 검색 수행"""
        
        # 쿼리 임베딩 생성
        query_embedding_sql = f"""
        SELECT ML.EMBED(
            MODEL embedding_model,
            '{query}'
        ) as query_embedding
        """
        
        query_embedding_result = await self.client.query(query_embedding_sql)
        query_embedding = query_embedding_result.to_dict()['query_embedding']
        
        # 유사도 검색
        search_query = f"""
        WITH similarity_scores AS (
            SELECT 
                doc_id,
                title,
                content_preview,
                metadata,
                COSINE_SIMILARITY(embedding, {query_embedding}) as similarity_score
            FROM vector_store
            WHERE JSON_EXTRACT(metadata, '$.access_level') <= '{user_access_level}'
        )
        SELECT 
            doc_id,
            title,
            content_preview,
            similarity_score,
            metadata
        FROM similarity_scores
        WHERE similarity_score > 0.7
        ORDER BY similarity_score DESC
        LIMIT {top_k}
        """
        
        results = await self.client.query(search_query)
        return results.to_list()
    
    async def generate_answer(self, query: str, context_docs: List[Dict], user_id: str) -> Dict[str, Any]:
        """컨텍스트 기반 답변 생성"""
        
        # 컨텍스트 구성
        context = "\n\n".join([
            f"Document: {doc['title']}\nContent: {doc['content_preview']}"
            for doc in context_docs
        ])
        
        # RAG 답변 생성
        generation_query = f"""
        SELECT 
            response,
            confidence,
            sources_used
        FROM ML.CHAT(
            MODEL generation_model,
            'Context information:
            {context}
            
            Based on the above context, please answer the following question accurately and cite your sources:
            
            Question: {query}
            
            Please provide:
            1. A comprehensive answer based on the context
            2. Source citations
            3. If the information is not available in the context, please state so clearly'
        )
        """
        
        answer_result = await self.client.query(generation_query)
        answer_data = answer_result.to_dict()
        
        # 사용 로그 기록
        await self.log_interaction(user_id, query, answer_data, context_docs)
        
        return {
            'answer': answer_data.get('response', ''),
            'confidence': answer_data.get('confidence', 0),
            'sources': [doc['doc_id'] for doc in context_docs],
            'timestamp': datetime.now().isoformat()
        }
    
    async def log_interaction(self, user_id: str, query: str, answer: Dict, sources: List[Dict]):
        """RAG 상호작용 로깅 및 분석"""
        
        log_query = f"""
        INSERT INTO rag_interactions (
            user_id,
            query,
            answer,
            sources_count,
            confidence_score,
            timestamp
        ) VALUES (
            '{user_id}',
            '{query}',
            '{answer.get("response", "")}',
            {len(sources)},
            {answer.get("confidence", 0)},
            NOW()
        )
        """
        
        await self.client.query(log_query)
    
    async def get_rag_analytics(self, days: int = 7) -> Dict[str, Any]:
        """RAG 시스템 분석 및 성능 지표"""
        
        analytics_query = f"""
        WITH daily_stats AS (
            SELECT 
                DATE(timestamp) as date,
                COUNT(*) as total_queries,
                AVG(confidence_score) as avg_confidence,
                COUNT(CASE WHEN confidence_score > 0.8 THEN 1 END) as high_confidence_queries,
                COUNT(DISTINCT user_id) as unique_users
            FROM rag_interactions
            WHERE timestamp >= NOW() - INTERVAL '{days} days'
            GROUP BY DATE(timestamp)
        ),
        popular_topics AS (
            SELECT 
                query,
                COUNT(*) as frequency
            FROM rag_interactions
            WHERE timestamp >= NOW() - INTERVAL '{days} days'
            GROUP BY query
            ORDER BY frequency DESC
            LIMIT 10
        )
        SELECT 
            ds.date,
            ds.total_queries,
            ds.avg_confidence,
            ds.high_confidence_queries,
            ds.unique_users,
            pt.query as popular_query,
            pt.frequency
        FROM daily_stats ds
        CROSS JOIN popular_topics pt
        ORDER BY ds.date DESC, pt.frequency DESC
        """
        
        result = await self.client.query(analytics_query)
        return result.to_dict()
    
    async def ask_question(self, query: str, user_id: str, user_access_level: str = 'public') -> Dict[str, Any]:
        """전체 RAG 플로우 실행"""
        
        # 1. 의미적 검색으로 관련 문서 찾기
        relevant_docs = await self.semantic_search(query, user_access_level, top_k=5)
        
        if not relevant_docs:
            return {
                'answer': 'I could not find relevant information to answer your question.',
                'confidence': 0,
                'sources': [],
                'timestamp': datetime.now().isoformat()
            }
        
        # 2. 컨텍스트 기반 답변 생성
        answer = await self.generate_answer(query, relevant_docs, user_id)
        
        return answer

# 사용 예제
async def main():
    rag_system = EnterpriseRAGSystem()
    
    # 문서 인덱싱 (정기적으로 실행)
    await rag_system.index_documents(50)
    
    # 질문 답변
    answer = await rag_system.ask_question(
        query="What is our company's remote work policy?",
        user_id="employee_123",
        user_access_level="internal"
    )
    print(f"RAG Answer: {answer}")
    
    # 분석 데이터
    analytics = await rag_system.get_rag_analytics(30)  # 최근 30일
    print(f"RAG Analytics: {analytics}")

if __name__ == "__main__":
    asyncio.run(main())

4. AI 에이전트 백엔드 플랫폼

문제 상황

- 복잡한 비즈니스 프로세스 자동화
- 다중 단계 의사결정 시스템
- 실시간 데이터 기반 액션 수행
- 에이전트 간 협업 및 워크플로우 관리

에이전트 시스템 구현

# ai_agent_platform.py
import spicepy
import asyncio
from typing import Dict, List, Any, Optional
from enum import Enum
import json
from datetime import datetime, timedelta

class AgentType(Enum):
    CUSTOMER_SERVICE = "customer_service"
    SALES_ASSISTANT = "sales_assistant"
    OPERATIONS_MANAGER = "operations_manager"
    SECURITY_MONITOR = "security_monitor"

class AIAgentPlatform:
    def __init__(self):
        self.client = spicepy.Client()
        self.active_agents = {}
        
    async def initialize_agent_workspace(self):
        """에이전트 작업 공간 초기화"""
        
        workspace_setup = """
        -- 에이전트 상태 테이블
        CREATE TABLE IF NOT EXISTS agent_states (
            agent_id VARCHAR PRIMARY KEY,
            agent_type VARCHAR,
            status VARCHAR,
            current_task TEXT,
            context JSON,
            last_activity TIMESTAMP,
            performance_metrics JSON
        );
        
        -- 에이전트 간 메시지 큐
        CREATE TABLE IF NOT EXISTS agent_messages (
            message_id UUID PRIMARY KEY,
            from_agent VARCHAR,
            to_agent VARCHAR,
            message_type VARCHAR,
            payload JSON,
            status VARCHAR DEFAULT 'pending',
            created_at TIMESTAMP DEFAULT NOW(),
            processed_at TIMESTAMP
        );
        
        -- 태스크 관리
        CREATE TABLE IF NOT EXISTS agent_tasks (
            task_id UUID PRIMARY KEY,
            assigned_agent VARCHAR,
            task_type VARCHAR,
            priority INTEGER,
            status VARCHAR DEFAULT 'pending',
            input_data JSON,
            output_data JSON,
            created_at TIMESTAMP DEFAULT NOW(),
            completed_at TIMESTAMP
        );
        """
        
        await self.client.query(workspace_setup)
    
    async def deploy_customer_service_agent(self, agent_id: str):
        """고객 서비스 에이전트 배포"""
        
        agent_config = {
            'agent_id': agent_id,
            'agent_type': AgentType.CUSTOMER_SERVICE.value,
            'capabilities': [
                'handle_customer_inquiries',
                'escalate_complex_issues',
                'update_customer_records',
                'generate_support_tickets'
            ],
            'data_sources': [
                'customer_database',
                'support_knowledge_base',
                'previous_interactions'
            ]
        }
        
        # 에이전트 등록
        register_query = f"""
        INSERT INTO agent_states (
            agent_id, agent_type, status, context, last_activity
        ) VALUES (
            '{agent_id}',
            '{AgentType.CUSTOMER_SERVICE.value}',
            'active',
            '{json.dumps(agent_config)}',
            NOW()
        )
        ON CONFLICT (agent_id) DO UPDATE SET
            status = 'active',
            context = EXCLUDED.context,
            last_activity = NOW()
        """
        
        await self.client.query(register_query)
        
        # 에이전트 워크플로우 설정
        workflow_query = f"""
        WITH agent_workflow AS (
            SELECT 
                '{agent_id}' as agent_id,
                'customer_inquiry' as trigger_event,
                JSON_BUILD_OBJECT(
                    'steps', JSON_BUILD_ARRAY(
                        JSON_BUILD_OBJECT('action', 'analyze_inquiry', 'timeout', 30),
                        JSON_BUILD_OBJECT('action', 'search_knowledge_base', 'timeout', 60),
                        JSON_BUILD_OBJECT('action', 'generate_response', 'timeout', 45),
                        JSON_BUILD_OBJECT('action', 'update_customer_record', 'timeout', 15)
                    )
                ) as workflow_config
        )
        SELECT agent_id, workflow_config FROM agent_workflow
        """
        
        return await self.client.query(workflow_query)
    
    async def handle_customer_inquiry(self, agent_id: str, customer_id: str, inquiry: str) -> Dict[str, Any]:
        """고객 문의 처리"""
        
        # 1. 고객 컨텍스트 수집
        context_query = f"""
        WITH customer_profile AS (
            SELECT 
                customer_id,
                name,
                tier,
                total_purchases,
                last_interaction
            FROM customers
            WHERE customer_id = '{customer_id}'
        ),
        recent_interactions AS (
            SELECT 
                interaction_type,
                description,
                resolution_status,
                timestamp
            FROM customer_interactions
            WHERE customer_id = '{customer_id}'
                AND timestamp >= NOW() - INTERVAL '30 days'
            ORDER BY timestamp DESC
            LIMIT 5
        )
        SELECT 
            cp.*,
            JSON_AGG(ri.*) as recent_interactions
        FROM customer_profile cp
        LEFT JOIN recent_interactions ri ON TRUE
        GROUP BY cp.customer_id, cp.name, cp.tier, cp.total_purchases, cp.last_interaction
        """
        
        customer_context = await self.client.query(context_query)
        
        # 2. 지식 베이스 검색
        knowledge_search = f"""
        SELECT 
            solution_id,
            problem_description,
            solution_steps,
            success_rate,
            category
        FROM support_knowledge_base
        WHERE SIMILARITY(problem_description, '{inquiry}') > 0.7
        ORDER BY success_rate DESC, SIMILARITY(problem_description, '{inquiry}') DESC
        LIMIT 3
        """
        
        knowledge_results = await self.client.query(knowledge_search)
        
        # 3. AI 기반 응답 생성
        response_generation = f"""
        SELECT 
            response,
            confidence,
            escalation_needed,
            follow_up_actions
        FROM ML.CHAT(
            MODEL customer_service_model,
            'Customer Context: {customer_context.to_dict() if customer_context else "{}"}
             
             Knowledge Base Results: {knowledge_results.to_list() if knowledge_results else []}
             
             Customer Inquiry: {inquiry}
             
             Please provide:
             1. A helpful and empathetic response
             2. Confidence level (0-1)
             3. Whether escalation is needed
             4. Any follow-up actions required
             
             Format as JSON with keys: response, confidence, escalation_needed, follow_up_actions'
        )
        """
        
        ai_response = await self.client.query(response_generation)
        response_data = json.loads(ai_response.to_dict().get('response', '{}'))
        
        # 4. 상호작용 기록
        interaction_log = f"""
        INSERT INTO customer_interactions (
            customer_id,
            agent_id,
            interaction_type,
            inquiry,
            response,
            confidence_score,
            escalation_needed,
            timestamp
        ) VALUES (
            '{customer_id}',
            '{agent_id}',
            'chat_inquiry',
            '{inquiry}',
            '{response_data.get("response", "")}',
            {response_data.get("confidence", 0)},
            {response_data.get("escalation_needed", False)},
            NOW()
        )
        """
        
        await self.client.query(interaction_log)
        
        # 5. 에스컬레이션 필요 시 처리
        if response_data.get('escalation_needed', False):
            await self.escalate_to_human_agent(customer_id, inquiry, response_data)
        
        return {
            'response': response_data.get('response', ''),
            'confidence': response_data.get('confidence', 0),
            'escalation_needed': response_data.get('escalation_needed', False),
            'agent_id': agent_id,
            'timestamp': datetime.now().isoformat()
        }
    
    async def deploy_sales_assistant_agent(self, agent_id: str):
        """세일즈 어시스턴트 에이전트 배포"""
        
        sales_agent_config = {
            'agent_id': agent_id,
            'agent_type': AgentType.SALES_ASSISTANT.value,
            'capabilities': [
                'lead_qualification',
                'product_recommendations',
                'pricing_analysis',
                'proposal_generation',
                'follow_up_scheduling'
            ]
        }
        
        # 세일즈 에이전트 등록 및 워크플로우 설정
        register_query = f"""
        INSERT INTO agent_states (
            agent_id, agent_type, status, context, last_activity
        ) VALUES (
            '{agent_id}',
            '{AgentType.SALES_ASSISTANT.value}',
            'active',
            '{json.dumps(sales_agent_config)}',
            NOW()
        )
        """
        
        await self.client.query(register_query)
    
    async def qualify_lead(self, agent_id: str, lead_data: Dict[str, Any]) -> Dict[str, Any]:
        """리드 자격 평가"""
        
        # 리드 스코어링
        scoring_query = f"""
        WITH lead_analysis AS (
            SELECT 
                '{lead_data.get("company_size", "unknown")}' as company_size,
                '{lead_data.get("industry", "unknown")}' as industry,
                '{lead_data.get("budget_range", "unknown")}' as budget_range,
                '{lead_data.get("urgency", "unknown")}' as urgency,
                '{lead_data.get("decision_maker", "unknown")}' as decision_maker
        ),
        scoring AS (
            SELECT 
                CASE 
                    WHEN company_size IN ('enterprise', 'large') THEN 30
                    WHEN company_size = 'medium' THEN 20
                    WHEN company_size = 'small' THEN 10
                    ELSE 5
                END as size_score,
                CASE 
                    WHEN industry IN ('technology', 'finance', 'healthcare') THEN 25
                    WHEN industry IN ('retail', 'manufacturing') THEN 15
                    ELSE 10
                END as industry_score,
                CASE 
                    WHEN budget_range = 'high' THEN 25
                    WHEN budget_range = 'medium' THEN 15
                    WHEN budget_range = 'low' THEN 5
                    ELSE 0
                END as budget_score,
                CASE 
                    WHEN urgency = 'immediate' THEN 20
                    WHEN urgency = 'soon' THEN 15
                    WHEN urgency = 'future' THEN 5
                    ELSE 0
                END as urgency_score
            FROM lead_analysis
        )
        SELECT 
            size_score + industry_score + budget_score + urgency_score as total_score,
            CASE 
                WHEN size_score + industry_score + budget_score + urgency_score >= 80 THEN 'hot'
                WHEN size_score + industry_score + budget_score + urgency_score >= 60 THEN 'warm'
                WHEN size_score + industry_score + budget_score + urgency_score >= 40 THEN 'cold'
                ELSE 'unqualified'
            END as lead_status
        FROM scoring
        """
        
        score_result = await self.client.query(scoring_query)
        
        # AI 기반 리드 분석
        analysis_query = f"""
        SELECT 
            analysis,
            recommendations,
            next_actions
        FROM ML.CHAT(
            MODEL sales_assistant_model,
            'Lead Information: {json.dumps(lead_data)}
             
             Lead Score: {score_result.to_dict() if score_result else {}}
             
             Please provide:
             1. Detailed lead analysis
             2. Specific recommendations for engagement
             3. Next action steps
             4. Estimated conversion probability
             
             Format as JSON with keys: analysis, recommendations, next_actions, conversion_probability'
        )
        """
        
        ai_analysis = await self.client.query(analysis_query)
        analysis_data = json.loads(ai_analysis.to_dict().get('analysis', '{}'))
        
        return {
            'lead_score': score_result.to_dict() if score_result else {},
            'ai_analysis': analysis_data,
            'agent_id': agent_id,
            'timestamp': datetime.now().isoformat()
        }
    
    async def deploy_operations_manager_agent(self, agent_id: str):
        """운영 관리 에이전트 배포"""
        
        ops_config = {
            'agent_id': agent_id,
            'agent_type': AgentType.OPERATIONS_MANAGER.value,
            'capabilities': [
                'resource_optimization',
                'anomaly_detection',
                'capacity_planning',
                'incident_response',
                'performance_monitoring'
            ]
        }
        
        # 운영 에이전트 등록
        await self.client.query(f"""
        INSERT INTO agent_states (
            agent_id, agent_type, status, context, last_activity
        ) VALUES (
            '{agent_id}',
            '{AgentType.OPERATIONS_MANAGER.value}',
            'active',
            '{json.dumps(ops_config)}',
            NOW()
        )
        """)
    
    async def monitor_system_health(self, agent_id: str) -> Dict[str, Any]:
        """시스템 상태 모니터링 및 최적화"""
        
        # 시스템 메트릭 수집
        metrics_query = """
        WITH system_metrics AS (
            SELECT 
                service_name,
                AVG(cpu_usage) as avg_cpu,
                AVG(memory_usage) as avg_memory,
                COUNT(CASE WHEN status = 'error' THEN 1 END) as error_count,
                COUNT(*) as total_requests,
                AVG(response_time) as avg_response_time
            FROM system_metrics
            WHERE timestamp >= NOW() - INTERVAL '1 hour'
            GROUP BY service_name
        ),
        anomalies AS (
            SELECT 
                service_name,
                CASE 
                    WHEN avg_cpu > 80 THEN 'HIGH_CPU'
                    WHEN avg_memory > 85 THEN 'HIGH_MEMORY'
                    WHEN avg_response_time > 5000 THEN 'SLOW_RESPONSE'
                    WHEN error_count > total_requests * 0.05 THEN 'HIGH_ERROR_RATE'
                    ELSE 'NORMAL'
                END as anomaly_type
            FROM system_metrics
        )
        SELECT 
            sm.*,
            a.anomaly_type
        FROM system_metrics sm
        JOIN anomalies a ON sm.service_name = a.service_name
        WHERE a.anomaly_type != 'NORMAL'
        """
        
        metrics_result = await self.client.query(metrics_query)
        
        # AI 기반 운영 최적화 제안
        optimization_query = f"""
        SELECT 
            optimization_actions,
            risk_assessment,
            resource_recommendations
        FROM ML.CHAT(
            MODEL operations_model,
            'System Metrics: {metrics_result.to_list() if metrics_result else []}
             
             Based on the system metrics, please provide:
             1. Immediate optimization actions needed
             2. Risk assessment for current anomalies
             3. Resource scaling recommendations
             4. Preventive measures
             
             Format as JSON with keys: optimization_actions, risk_assessment, resource_recommendations'
        )
        """
        
        optimization_result = await self.client.query(optimization_query)
        optimization_data = json.loads(optimization_result.to_dict().get('optimization_actions', '{}'))
        
        # 자동 최적화 액션 실행
        if optimization_data.get('immediate_actions'):
            await self.execute_optimization_actions(optimization_data['immediate_actions'])
        
        return {
            'system_status': metrics_result.to_list() if metrics_result else [],
            'optimizations': optimization_data,
            'agent_id': agent_id,
            'timestamp': datetime.now().isoformat()
        }
    
    async def execute_optimization_actions(self, actions: List[str]):
        """최적화 액션 자동 실행"""
        
        for action in actions:
            action_query = f"""
            INSERT INTO optimization_actions (
                action_type,
                description,
                status,
                executed_at
            ) VALUES (
                'auto_optimization',
                '{action}',
                'executed',
                NOW()
            )
            """
            await self.client.query(action_query)
    
    async def agent_collaboration_workflow(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
        """에이전트 간 협업 워크플로우"""
        
        # 태스크 유형에 따른 에이전트 할당
        task_assignment_query = f"""
        WITH task_requirements AS (
            SELECT 
                '{task_data.get("type", "")}' as task_type,
                '{task_data.get("priority", "medium")}' as priority,
                '{json.dumps(task_data)}' as task_data
        ),
        agent_capabilities AS (
            SELECT 
                agent_id,
                agent_type,
                JSON_EXTRACT(context, '$.capabilities') as capabilities
            FROM agent_states
            WHERE status = 'active'
        )
        SELECT 
            ac.agent_id,
            ac.agent_type,
            tr.task_type,
            tr.priority
        FROM task_requirements tr
        CROSS JOIN agent_capabilities ac
        WHERE JSON_SEARCH(ac.capabilities, 'one', tr.task_type) IS NOT NULL
        ORDER BY 
            CASE tr.priority 
                WHEN 'high' THEN 1
                WHEN 'medium' THEN 2
                WHEN 'low' THEN 3
            END
        LIMIT 1
        """
        
        assignment_result = await self.client.query(task_assignment_query)
        
        if assignment_result:
            assigned_agent = assignment_result.to_dict()
            
            # 태스크 할당 및 실행
            task_execution_query = f"""
            INSERT INTO agent_tasks (
                task_id,
                assigned_agent,
                task_type,
                priority,
                input_data,
                status,
                created_at
            ) VALUES (
                '{task_data.get("task_id", "")}',
                '{assigned_agent["agent_id"]}',
                '{task_data.get("type", "")}',
                CASE '{task_data.get("priority", "medium")}'
                    WHEN 'high' THEN 1
                    WHEN 'medium' THEN 2
                    WHEN 'low' THEN 3
                END,
                '{json.dumps(task_data)}',
                'assigned',
                NOW()
            )
            """
            
            await self.client.query(task_execution_query)
            
            return {
                'assigned_agent': assigned_agent,
                'task_status': 'assigned',
                'estimated_completion': datetime.now() + timedelta(minutes=30)
            }
        
        return {'error': 'No suitable agent available'}

# 사용 예제
async def main():
    agent_platform = AIAgentPlatform()
    
    # 에이전트 플랫폼 초기화
    await agent_platform.initialize_agent_workspace()
    
    # 고객 서비스 에이전트 배포
    await agent_platform.deploy_customer_service_agent("cs_agent_001")
    
    # 고객 문의 처리
    customer_response = await agent_platform.handle_customer_inquiry(
        agent_id="cs_agent_001",
        customer_id="customer_12345",
        inquiry="I'm having trouble with my recent order delivery."
    )
    print(f"Customer Service Response: {customer_response}")
    
    # 세일즈 에이전트 배포 및 리드 평가
    await agent_platform.deploy_sales_assistant_agent("sales_agent_001")
    
    lead_qualification = await agent_platform.qualify_lead(
        agent_id="sales_agent_001",
        lead_data={
            "company_size": "enterprise",
            "industry": "technology",
            "budget_range": "high",
            "urgency": "immediate",
            "decision_maker": "yes"
        }
    )
    print(f"Lead Qualification: {lead_qualification}")
    
    # 운영 관리 에이전트 배포 및 시스템 모니터링
    await agent_platform.deploy_operations_manager_agent("ops_agent_001")
    
    system_health = await agent_platform.monitor_system_health("ops_agent_001")
    print(f"System Health: {system_health}")

if __name__ == "__main__":
    asyncio.run(main())

🚀 설치 및 배포 가이드

로컬 개발 환경

1. Docker를 통한 빠른 시작

# SpiceAI Docker 이미지 실행
docker run -p 3000:3000 -p 50051:50051 \
  -v $(pwd)/spicepod.yml:/app/spicepod.yml \
  spiceai/spiceai:latest

# 상태 확인
curl http://localhost:3000/health

# SQL 쿼리 테스트
curl -X POST http://localhost:3000/v1/sql \
  -H "Content-Type: application/json" \
  -d '{"sql": "SELECT 1 as test"}'

2. 로컬 바이너리 설치 (macOS)

# SpiceAI CLI 설치
curl https://install.spiceai.org | /bin/bash

# 프로젝트 초기화
mkdir my-spice-project
cd my-spice-project
spice init

# 개발 서버 시작
spice run

Private 클라우드 Kubernetes 배포

1. Helm을 통한 배포

# SpiceAI Helm 리포지토리 추가
helm repo add spiceai https://helm.spiceai.org
helm repo update

# 네임스페이스 생성
kubectl create namespace spiceai-platform

# 커스텀 values.yaml 생성
cat > values.yaml << 'EOF'
# Production 설정
replicaCount: 3

image:
  repository: spiceai/spiceai
  tag: "1.4.0"
  pullPolicy: IfNotPresent

# 리소스 제한
resources:
  limits:
    cpu: "2"
    memory: "4Gi"
  requests:
    cpu: "1"
    memory: "2Gi"

# 영구 저장소 설정
stateful:
  enabled: true
  storageClass: "fast-ssd"
  size: "100Gi"
  mountPath: "/data"

# 서비스 설정
service:
  type: LoadBalancer
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-internal: "true"
    service.beta.kubernetes.io/aws-load-balancer-type: "nlb"

# 모니터링 활성화
monitoring:
  podMonitor:
    enabled: true
    additionalLabels:
      release: prometheus

# 환경 변수
additionalEnv:
  - name: SPICED_LOG
    value: "INFO"
  - name: SPICE_SECRET_DATABASE_URL
    valueFrom:
      secretKeyRef:
        name: spice-secrets
        key: database-url
  - name: SPICE_SECRET_OPENAI_API_KEY
    valueFrom:
      secretKeyRef:
        name: spice-secrets
        key: openai-api-key

# SpicePod 설정
spicepod:
  name: enterprise-ai-platform
  version: v1
  kind: Spicepod
  
  datasets:
    - from: postgresql://postgres-primary:5432/enterprise_data
      name: enterprise_data
      description: "Main enterprise database"
      acceleration:
        enabled: true
        engine: duckdb
        mode: file
        params:
          duckdb_file: "/data/enterprise_data.db"
    
    - from: s3://company-datalake/analytics/
      name: analytics_data
      description: "Analytics data lake"
      params:
        file_format: parquet
      acceleration:
        enabled: true
        engine: arrow
        
  models:
    - from: openai://gpt-4
      name: enterprise_llm
      description: "Enterprise LLM for various AI tasks"
    
    - from: huggingface://sentence-transformers/all-MiniLM-L6-v2
      name: embedding_model
      description: "Text embedding model"
EOF

# Secrets 생성
kubectl create secret generic spice-secrets \
  --from-literal=database-url="postgresql://user:password@postgres:5432/db" \
  --from-literal=openai-api-key="your-openai-api-key" \
  --namespace spiceai-platform

# 배포
helm upgrade --install spiceai-platform spiceai/spiceai \
  --namespace spiceai-platform \
  --values values.yaml

# 배포 상태 확인
kubectl get pods -n spiceai-platform
kubectl get svc -n spiceai-platform

2. 고가용성 설정

# ha-spiceai-values.yaml
replicaCount: 5

# Pod Disruption Budget
podDisruptionBudget:
  enabled: true
  minAvailable: 2

# Anti-affinity for high availability
affinity:
  podAntiAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
    - weight: 100
      podAffinityTerm:
        labelSelector:
          matchExpressions:
          - key: app.kubernetes.io/name
            operator: In
            values:
            - spiceai
        topologyKey: kubernetes.io/hostname

# Horizontal Pod Autoscaler
autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70
  targetMemoryUtilizationPercentage: 80

# Load Balancer 설정
service:
  type: LoadBalancer
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-backend-protocol: "tcp"
    service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled: "true"
    service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
    service.beta.kubernetes.io/aws-load-balancer-internal: "true"

# 멀티 가용영역 배포
nodeSelector:
  node.kubernetes.io/instance-type: "c5.2xlarge"

tolerations:
  - key: "dedicated"
    operator: "Equal"
    value: "spiceai"
    effect: "NoSchedule"

3. 모니터링 및 관찰 가능성

# monitoring-setup.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: spiceai-grafana-dashboard
  namespace: spiceai-platform
data:
  dashboard.json: |
    {
      "dashboard": {
        "id": null,
        "title": "SpiceAI Enterprise Platform",
        "panels": [
          {
            "title": "Query Performance",
            "type": "graph",
            "targets": [
              {
                "expr": "spiceai_query_duration_seconds",
                "legendFormat": "Query Duration"
              }
            ]
          },
          {
            "title": "Memory Usage",
            "type": "graph",
            "targets": [
              {
                "expr": "spiceai_memory_usage_bytes",
                "legendFormat": "Memory Usage"
              }
            ]
          },
          {
            "title": "Active Connections",
            "type": "singlestat",
            "targets": [
              {
                "expr": "spiceai_active_connections",
                "legendFormat": "Connections"
              }
            ]
          }
        ]
      }
    }

---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: spiceai-metrics
  namespace: spiceai-platform
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: spiceai
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics

보안 설정

1. 네트워크 정책

# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: spiceai-network-policy
  namespace: spiceai-platform
spec:
  podSelector:
    matchLabels:
      app.kubernetes.io/name: spiceai
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: ai-applications
    - podSelector:
        matchLabels:
          app: allowed-client
    ports:
    - protocol: TCP
      port: 3000
    - protocol: TCP
      port: 50051
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: databases
    ports:
    - protocol: TCP
      port: 5432
  - to: []
    ports:
    - protocol: TCP
      port: 443  # HTTPS
    - protocol: TCP
      port: 53   # DNS
    - protocol: UDP
      port: 53   # DNS

2. RBAC 설정

# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spiceai-service-account
  namespace: spiceai-platform

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spiceai-cluster-role
rules:
- apiGroups: [""]
  resources: ["pods", "services", "endpoints"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list", "watch"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spiceai-cluster-role-binding
subjects:
- kind: ServiceAccount
  name: spiceai-service-account
  namespace: spiceai-platform
roleRef:
  kind: ClusterRole
  name: spiceai-cluster-role
  apiGroup: rbac.authorization.k8s.io

📊 성능 최적화 및 운영

1. 데이터 가속화 전략

Arrow 인메모리 가속화

# 고성능 실시간 쿼리용
datasets:
  - from: postgresql://analytics-db:5432/real_time_data
    name: real_time_metrics
    acceleration:
      enabled: true
      engine: arrow
      mode: memory
      refresh_check_interval: 1m
      refresh_mode: full

DuckDB 파일 기반 가속화

# 대용량 분석 쿼리용
datasets:
  - from: s3://data-lake/historical-data/
    name: historical_analytics
    acceleration:
      enabled: true
      engine: duckdb
      mode: file
      params:
        duckdb_file: "/data/historical.db"
      refresh_check_interval: 1h
      refresh_mode: incremental

2. 클러스터 확장 전략

수평 확장 설정

# CPU 기반 자동 확장
kubectl autoscale deployment spiceai-platform \
  --cpu-percent=70 \
  --min=3 \
  --max=20 \
  --namespace spiceai-platform

# 커스텀 메트릭 기반 확장
cat > hpa-custom.yaml << 'EOF'
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: spiceai-hpa-custom
  namespace: spiceai-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: spiceai-platform
  minReplicas: 3
  maxReplicas: 50
  metrics:
  - type: Pods
    pods:
      metric:
        name: spiceai_query_queue_length
      target:
        type: AverageValue
        averageValue: "10"
  - type: Pods
    pods:
      metric:
        name: spiceai_memory_utilization
      target:
        type: AverageValue
        averageValue: "80"
EOF

kubectl apply -f hpa-custom.yaml

3. 백업 및 재해 복구

데이터 백업 전략

#!/bin/bash
# spiceai-backup.sh

BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_PATH="/backups/spiceai-${BACKUP_DATE}"

# 1. 설정 백업
kubectl get configmap -n spiceai-platform -o yaml > "${BACKUP_PATH}/configmaps.yaml"
kubectl get secret -n spiceai-platform -o yaml > "${BACKUP_PATH}/secrets.yaml"

# 2. 데이터 백업 (PVC 스냅샷)
kubectl get pvc -n spiceai-platform -o json | \
  jq -r '.items[] | select(.metadata.name | contains("spiceai")) | .metadata.name' | \
  while read pvc_name; do
    # 볼륨 스냅샷 생성
    cat > "${BACKUP_PATH}/${pvc_name}-snapshot.yaml" << EOF
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshot
metadata:
  name: ${pvc_name}-snapshot-${BACKUP_DATE}
  namespace: spiceai-platform
spec:
  source:
    persistentVolumeClaimName: ${pvc_name}
  volumeSnapshotClassName: csi-aws-vsc
EOF
    kubectl apply -f "${BACKUP_PATH}/${pvc_name}-snapshot.yaml"
  done

# 3. 메타데이터 백업
kubectl exec -n spiceai-platform deployment/spiceai-platform -- \
  pg_dump "$DATABASE_URL" > "${BACKUP_PATH}/metadata-backup.sql"

echo "Backup completed: ${BACKUP_PATH}"

재해 복구 계획

#!/bin/bash
# spiceai-restore.sh

RESTORE_PATH="$1"

if [ -z "$RESTORE_PATH" ]; then
  echo "Usage: $0 <backup-path>"
  exit 1
fi

# 1. 네임스페이스 재생성
kubectl create namespace spiceai-platform

# 2. 설정 복원
kubectl apply -f "${RESTORE_PATH}/configmaps.yaml"
kubectl apply -f "${RESTORE_PATH}/secrets.yaml"

# 3. 볼륨 복원
for snapshot_file in "${RESTORE_PATH}"/*-snapshot.yaml; do
  kubectl apply -f "$snapshot_file"
done

# 4. 애플리케이션 재배포
helm upgrade --install spiceai-platform spiceai/spiceai \
  --namespace spiceai-platform \
  --values values.yaml

echo "Restore completed from: ${RESTORE_PATH}"

zshrc 별칭 설정

# ~/.zshrc에 추가할 SpiceAI 관련 별칭들

# SpiceAI 관련 디렉토리
export SPICEAI_HOME="$HOME/.spiceai"
export SPICEAI_PROJECTS_DIR="$HOME/spiceai-projects"

# 기본 별칭
alias spice="spice"
alias spice-init="spice init"
alias spice-run="spice run"
alias spice-build="spice build"
alias spice-status="spice status"

# Docker 관련
alias spice-docker="docker run -p 3000:3000 -p 50051:50051 -v \$(pwd):/app spiceai/spiceai:latest"
alias spice-docker-dev="docker run -it -p 3000:3000 -p 50051:50051 -v \$(pwd):/app spiceai/spiceai:latest /bin/bash"

# Kubernetes 관련
alias k-spice="kubectl -n spiceai-platform"
alias spice-logs="kubectl logs -n spiceai-platform -l app.kubernetes.io/name=spiceai"
alias spice-pods="kubectl get pods -n spiceai-platform"
alias spice-svc="kubectl get svc -n spiceai-platform"

# SQL 쿼리 테스트
function spice-query() {
    if [ -z "$1" ]; then
        echo "Usage: spice-query 'SQL_QUERY'"
        return 1
    fi
    
    curl -X POST http://localhost:3000/v1/sql \
        -H "Content-Type: application/json" \
        -d "{\"sql\": \"$1\"}" | jq .
}

# SpicePod 검증
alias spice-validate="spice validate"
alias spice-fmt="spice fmt"

# 모니터링
alias spice-metrics="curl -s http://localhost:3000/metrics"
alias spice-health="curl -s http://localhost:3000/health | jq ."

# 프로젝트 관리
function spice-new() {
    local project_name="$1"
    if [ -z "$project_name" ]; then
        echo "Usage: spice-new <project-name>"
        return 1
    fi
    
    mkdir -p "$SPICEAI_PROJECTS_DIR/$project_name"
    cd "$SPICEAI_PROJECTS_DIR/$project_name"
    spice init
    
    echo "✅ SpiceAI 프로젝트 '$project_name' 생성 완료"
    echo "📁 경로: $SPICEAI_PROJECTS_DIR/$project_name"
}

# 개발 환경 설정
function spice-dev-setup() {
    echo "🚀 SpiceAI 개발 환경 설정 중..."
    
    # 필요한 디렉토리 생성
    mkdir -p "$SPICEAI_PROJECTS_DIR"
    mkdir -p "$SPICEAI_HOME"
    
    # Docker 이미지 pull
    docker pull spiceai/spiceai:latest
    
    # 기본 프로젝트 생성
    spice-new "sample-project"
    
    echo "✅ SpiceAI 개발 환경 설정 완료"
}

# 백업 관련
function spice-backup() {
    local backup_name="spiceai-backup-$(date +%Y%m%d-%H%M%S)"
    mkdir -p "$HOME/spiceai-backups/$backup_name"
    
    # 프로젝트 백업
    cp -r "$SPICEAI_PROJECTS_DIR" "$HOME/spiceai-backups/$backup_name/projects"
    
    # 설정 백업
    if [ -d "$SPICEAI_HOME" ]; then
        cp -r "$SPICEAI_HOME" "$HOME/spiceai-backups/$backup_name/config"
    fi
    
    echo "✅ 백업 완료: $HOME/spiceai-backups/$backup_name"
}

# Helm 관련
alias helm-spice="helm -n spiceai-platform"
alias spice-install="helm upgrade --install spiceai-platform spiceai/spiceai --namespace spiceai-platform"
alias spice-uninstall="helm uninstall spiceai-platform --namespace spiceai-platform"

# 로그 및 디버깅
function spice-debug() {
    echo "=== SpiceAI 디버그 정보 ==="
    echo "SpiceAI 버전:"
    spice --version 2>/dev/null || echo "SpiceAI CLI not installed"
    
    echo -e "\nDocker 이미지:"
    docker images spiceai/spiceai 2>/dev/null || echo "No SpiceAI Docker images found"
    
    echo -e "\nKubernetes 상태:"
    k-spice get pods 2>/dev/null || echo "Kubernetes not configured or no pods"
    
    echo -e "\n로컬 서버 상태:"
    spice-health 2>/dev/null || echo "Local SpiceAI server not running"
}

# 성능 테스트
function spice-benchmark() {
    local query="${1:-SELECT 1 as test}"
    local iterations="${2:-100}"
    
    echo "🔥 SpiceAI 성능 벤치마크 시작..."
    echo "쿼리: $query"
    echo "반복 횟수: $iterations"
    
    start_time=$(date +%s%3N)
    
    for i in $(seq 1 $iterations); do
        spice-query "$query" > /dev/null 2>&1
        if [ $((i % 10)) -eq 0 ]; then
            echo "Progress: $i/$iterations"
        fi
    done
    
    end_time=$(date +%s%3N)
    total_time=$((end_time - start_time))
    avg_time=$((total_time / iterations))
    
    echo "✅ 벤치마크 완료"
    echo "총 시간: ${total_time}ms"
    echo "평균 응답 시간: ${avg_time}ms"
    echo "초당 쿼리 수: $((1000 / avg_time))"
}

# 도움말
function spice-help() {
    echo "🌶️  SpiceAI 별칭 도움말"
    echo ""
    echo "기본 명령어:"
    echo "  spice-init         - 새 SpiceAI 프로젝트 초기화"
    echo "  spice-run          - SpiceAI 서버 실행"
    echo "  spice-query        - SQL 쿼리 실행"
    echo ""
    echo "프로젝트 관리:"
    echo "  spice-new <name>   - 새 프로젝트 생성"
    echo "  spice-dev-setup    - 개발 환경 초기 설정"
    echo "  spice-backup       - 프로젝트 백업"
    echo ""
    echo "Docker:"
    echo "  spice-docker       - Docker로 SpiceAI 실행"
    echo "  spice-docker-dev   - Docker 개발 모드"
    echo ""
    echo "Kubernetes:"
    echo "  k-spice           - kubectl with spiceai namespace"
    echo "  spice-install     - Helm으로 설치"
    echo "  spice-logs        - Pod 로그 확인"
    echo ""
    echo "모니터링:"
    echo "  spice-health      - 서버 상태 확인"
    echo "  spice-metrics     - 메트릭 확인"
    echo "  spice-debug       - 디버그 정보"
    echo "  spice-benchmark   - 성능 테스트"
}

개발환경 정보

# 테스트 환경 정보
echo "=== SpiceAI 개발환경 정보 ==="
echo "날짜: $(date)"
echo "OS: $(uname -a)"
echo "Docker: $(docker --version 2>/dev/null || echo 'Docker not installed')"
echo "Kubernetes: $(kubectl version --client --short 2>/dev/null || echo 'kubectl not installed')"
echo "Helm: $(helm version --short 2>/dev/null || echo 'Helm not installed')"
echo "SpiceAI CLI: $(spice --version 2>/dev/null || echo 'SpiceAI CLI not installed')"
echo "사용 가능 메모리: $(free -h 2>/dev/null | grep Mem || vm_stat | head -5)"
echo "디스크 공간: $(df -h . | tail -1)"

검증된 환경

이 가이드는 다음 환경에서 테스트되었습니다:

- macOS Sonoma (Apple M4 Pro, 48GB RAM)
- Ubuntu 22.04 LTS (Intel/AMD x86_64)
- Kubernetes 1.28+
- Docker 24.0+
- Helm 3.12+
- SpiceAI 1.4.0+

결론

SpiceAI는 Apache-2.0 라이선스 하에서 완전히 자유로운 상업적 사용이 가능한 혁신적인 AI 인프라 플랫폼입니다. Private 클라우드 환경의 AI 플랫폼 팀에게 다음과 같은 강력한 가치를 제공합니다:

🎯 핵심 가치 제안

  1. 통합 데이터 플랫폼: 분산된 데이터 소스를 SQL로 통합 쿼리
  2. 실시간 AI 추론: 고성능 ML 모델 서빙 및 추론
  3. 엔터프라이즈 RAG: 지능형 검색 및 생성 시스템
  4. AI 에이전트 백엔드: 복잡한 비즈니스 프로세스 자동화
  5. 확장 가능한 아키텍처: Kubernetes 네이티브 설계

🚀 Private 클라우드 팀 권장 도입 전략

  1. POC부터 시작: 단일 데이터 소스 연동으로 개념 증명
  2. 점진적 확장: 추가 데이터 소스와 AI 모델 통합
  3. 운영 최적화: 모니터링, 백업, 고가용성 구성
  4. 전사 확산: 검증된 패턴을 다른 팀으로 확산

💡 실무 활용 시나리오

  • 데이터 팀: 실시간 데이터 페더레이션 및 분석
  • AI/ML 팀: 모델 서빙 및 추론 인프라
  • 제품 팀: RAG 기반 지능형 서비스 구축
  • 운영 팀: AI 기반 시스템 모니터링 및 최적화

SpiceAI를 통해 Private 클라우드 환경에서 차세대 AI 플랫폼을 구축하고, 데이터 중심의 지능형 서비스를 효율적으로 제공할 수 있습니다. 🌶️

관련 링크