SpiceAI Private 클라우드 AI 플랫폼 완전 활용 가이드 - 엔터프라이즈 데이터 통합부터 AI 추론까지
⏱️ 예상 읽기 시간: 22분
서론
SpiceAI는 Apache-2.0 라이선스 하에 배포되는 오픈소스 AI 인프라 플랫폼으로, Rust로 작성된 고성능 SQL 쿼리, 검색, LLM 추론 엔진입니다. Private 클라우드 환경에서 AI 플랫폼을 구축하는 팀들에게 데이터 통합부터 AI 추론까지 통합된 솔루션을 제공합니다.
특히 엔터프라이즈 환경에서 분산된 데이터 소스들을 통합하고, 실시간 ML 추론을 수행하며, RAG(Retrieval-Augmented Generation) 시스템을 구축하는데 최적화되어 있습니다. 이 가이드에서는 private 클라우드 회사의 AI 플랫폼 팀이 SpiceAI를 어떻게 활용할 수 있는지 실제 사례와 함께 상세히 다루겠습니다.
📄 라이선스 분석 및 엔터프라이즈 적합성
Apache-2.0 라이선스 개요
SpiceAI는 Apache License 2.0을 사용하며, 이는 상업적 활용에 매우 관대한 라이선스입니다.
엔터프라이즈 사용 가능성 매트릭스
사용 시나리오 | 허용 여부 | 소스코드 공개 의무 | 추가 조건 |
---|---|---|---|
Private 클라우드 배포 | ✅ 완전 허용 | ❌ 불필요 | 없음 |
고객사 서비스 제공 | ✅ 완전 허용 | ❌ 불필요 | 라이선스 고지만 필요 |
수정 후 내부 배포 | ✅ 완전 허용 | ❌ 불필요 | 라이선스 고지만 필요 |
클라우드 서비스 상품화 | ✅ 완전 허용 | ❌ 불필요 | 라이선스 고지만 필요 |
임베디드 제품 포함 | ✅ 완전 허용 | ❌ 불필요 | 라이선스 고지만 필요 |
🏢 엔터프라이즈 권장 사용 방식
권장사항:
상업적_사용: "완전 자유 - 제약 없음"
수정_배포: "자유롭게 가능"
특허_보호: "Apache-2.0 특허 보호 조항 적용"
엔터프라이즈_지원: "커뮤니티 및 상용 지원 모두 가능"
준수사항:
라이선스_고지: "배포 시 LICENSE 파일 포함"
저작권_표시: "원본 저작권 고지 유지"
변경사항_표시: "수정 사항이 있다면 NOTICE 파일에 기록"
결론: Private 클라우드 회사에서 완전히 자유롭게 상업적 활용 가능
🎯 Private 클라우드 AI 플랫폼 핵심 활용 사례
1. 데이터 페더레이션 및 통합 플랫폼
문제 상황
- 여러 데이터베이스에 분산된 고객 데이터
- 실시간 분석을 위한 데이터 통합 필요
- 각 시스템별로 다른 접근 방식과 API
- 데이터 이동 없이 쿼리 수행 요구사항
SpiceAI 솔루션
// spicepod.yml - 데이터 페더레이션 설정
version: v1beta1
kind: Spicepod
metadata:
name: enterprise-data-federation
datasets:
# PostgreSQL 고객 데이터
- from: postgresql://user:pass@customer-db:5432/customers
name: customers
description: "Customer master data"
params:
sql: |
SELECT customer_id, name, email, created_at, tier
FROM customers
WHERE active = true
# Snowflake 분석 데이터
- from: snowflake://account.region/warehouse/database/schema
name: analytics_data
description: "Customer analytics and behavior data"
params:
sql: |
SELECT customer_id, event_type, event_time, value
FROM customer_events
WHERE event_time >= NOW() - INTERVAL '30 days'
# S3 로그 데이터
- from: s3://company-logs/user-behavior/
name: user_logs
description: "User behavior logs from applications"
params:
file_format: parquet
# DynamoDB 실시간 세션 데이터
- from: dynamodb://us-west-2/user-sessions
name: active_sessions
description: "Real-time user session data"
# 데이터 가속화 설정
models:
- from: duckdb
name: customer_360_view
description: "Unified customer 360 view"
datasets:
- customers
- analytics_data
- user_logs
- active_sessions
실제 구현 예제
# enterprise_data_platform.py
import spicepy
from datetime import datetime, timedelta
import asyncio
class EnterpriseDataPlatform:
def __init__(self):
self.client = spicepy.Client()
async def get_customer_360_view(self, customer_id: str):
"""통합 고객 360도 뷰 생성"""
query = f"""
WITH customer_base AS (
SELECT customer_id, name, email, tier, created_at
FROM customers
WHERE customer_id = '{customer_id}'
),
recent_activity AS (
SELECT
customer_id,
COUNT(*) as event_count,
MAX(event_time) as last_activity,
SUM(CASE WHEN event_type = 'purchase' THEN value ELSE 0 END) as total_purchases
FROM analytics_data
WHERE customer_id = '{customer_id}'
GROUP BY customer_id
),
session_info AS (
SELECT
customer_id,
COUNT(*) as active_sessions,
MAX(last_seen) as last_session
FROM active_sessions
WHERE customer_id = '{customer_id}'
GROUP BY customer_id
)
SELECT
cb.customer_id,
cb.name,
cb.email,
cb.tier,
cb.created_at,
COALESCE(ra.event_count, 0) as total_events,
ra.last_activity,
COALESCE(ra.total_purchases, 0) as lifetime_value,
COALESCE(si.active_sessions, 0) as current_sessions,
si.last_session
FROM customer_base cb
LEFT JOIN recent_activity ra ON cb.customer_id = ra.customer_id
LEFT JOIN session_info si ON cb.customer_id = si.customer_id
"""
result = await self.client.query(query)
return result.to_dict()
async def get_real_time_analytics(self, time_window_hours: int = 1):
"""실시간 분석 대시보드 데이터"""
query = f"""
SELECT
DATE_TRUNC('minute', event_time) as time_bucket,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT customer_id) as unique_users,
AVG(value) as avg_value,
SUM(value) as total_value
FROM analytics_data
WHERE event_time >= NOW() - INTERVAL '{time_window_hours} hours'
GROUP BY time_bucket, event_type
ORDER BY time_bucket DESC, event_count DESC
"""
return await self.client.query(query)
async def detect_anomalies(self):
"""실시간 이상 탐지"""
query = """
WITH hourly_metrics AS (
SELECT
DATE_TRUNC('hour', event_time) as hour,
COUNT(*) as total_events,
COUNT(DISTINCT customer_id) as unique_users,
AVG(value) as avg_transaction_value
FROM analytics_data
WHERE event_time >= NOW() - INTERVAL '24 hours'
GROUP BY hour
),
stats AS (
SELECT
AVG(total_events) as avg_events,
STDDEV(total_events) as stddev_events,
AVG(unique_users) as avg_users,
STDDEV(unique_users) as stddev_users
FROM hourly_metrics
WHERE hour < DATE_TRUNC('hour', NOW())
)
SELECT
hm.hour,
hm.total_events,
hm.unique_users,
hm.avg_transaction_value,
CASE
WHEN ABS(hm.total_events - s.avg_events) > 2 * s.stddev_events
THEN 'ANOMALY_EVENTS'
WHEN ABS(hm.unique_users - s.avg_users) > 2 * s.stddev_users
THEN 'ANOMALY_USERS'
ELSE 'NORMAL'
END as anomaly_status
FROM hourly_metrics hm
CROSS JOIN stats s
WHERE hm.hour = DATE_TRUNC('hour', NOW()) - INTERVAL '1 hour'
"""
return await self.client.query(query)
# 사용 예제
async def main():
platform = EnterpriseDataPlatform()
# 고객 360도 뷰
customer_view = await platform.get_customer_360_view("customer_12345")
print(f"Customer 360 View: {customer_view}")
# 실시간 분석
analytics = await platform.get_real_time_analytics(6) # 최근 6시간
print(f"Real-time Analytics: {analytics}")
# 이상 탐지
anomalies = await platform.detect_anomalies()
print(f"Anomaly Detection: {anomalies}")
if __name__ == "__main__":
asyncio.run(main())
2. 실시간 ML 추론 플랫폼
문제 상황
- 다양한 ML 모델들의 분산 배포
- 실시간 특성 데이터 수집 및 처리
- 모델 서빙 인프라의 복잡성
- A/B 테스트 및 모델 버전 관리
SpiceAI 솔루션
# spicepod.yml - ML 추론 플랫폼 설정
version: v1beta1
kind: Spicepod
metadata:
name: ml-inference-platform
datasets:
# 실시간 특성 데이터
- from: kafka://kafka-cluster:9092/feature-stream
name: real_time_features
description: "Real-time feature data from Kafka"
params:
topic: feature-stream
consumer_group: spice-ml-platform
# 배치 특성 저장소
- from: postgresql://features-db:5432/feature_store
name: batch_features
description: "Batch processed features"
# 모델 메타데이터
- from: postgresql://model-registry:5432/models
name: model_registry
description: "ML model registry and metadata"
models:
# 추천 모델
- from: huggingface://company/recommendation-model-v2
name: recommendation_model
description: "Product recommendation model"
# 이상 탐지 모델
- from: local://models/anomaly-detection.onnx
name: anomaly_detector
description: "Transaction anomaly detection"
# 감정 분석 모델
- from: openai://gpt-4
name: sentiment_analyzer
description: "Customer feedback sentiment analysis"
실시간 추론 시스템 구현
# ml_inference_platform.py
import spicepy
import asyncio
from typing import Dict, List, Any
import json
from datetime import datetime
class MLInferencePlatform:
def __init__(self):
self.client = spicepy.Client()
self.model_cache = {}
async def get_real_time_features(self, user_id: str) -> Dict[str, Any]:
"""실시간 특성 데이터 수집"""
query = f"""
WITH recent_activity AS (
SELECT
user_id,
AVG(session_duration) as avg_session_duration,
COUNT(*) as activity_count,
MAX(last_action_time) as last_activity
FROM real_time_features
WHERE user_id = '{user_id}'
AND timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY user_id
),
batch_profile AS (
SELECT
user_id,
age_group,
preferred_category,
purchase_frequency,
average_order_value
FROM batch_features
WHERE user_id = '{user_id}'
)
SELECT
ra.user_id,
ra.avg_session_duration,
ra.activity_count,
ra.last_activity,
bp.age_group,
bp.preferred_category,
bp.purchase_frequency,
bp.average_order_value
FROM recent_activity ra
LEFT JOIN batch_profile bp ON ra.user_id = bp.user_id
"""
result = await self.client.query(query)
return result.to_dict() if result else {}
async def predict_recommendations(self, user_id: str, num_recommendations: int = 5) -> List[Dict]:
"""상품 추천 예측"""
# 특성 데이터 수집
features = await self.get_real_time_features(user_id)
if not features:
return []
# SpiceAI ML 모델을 통한 추론
ml_query = f"""
SELECT
product_id,
product_name,
category,
predicted_score,
confidence
FROM ML.PREDICT(
MODEL recommendation_model,
SELECT
'{user_id}' as user_id,
{features.get('avg_session_duration', 0)} as session_duration,
{features.get('activity_count', 0)} as recent_activity,
'{features.get('age_group', 'unknown')}' as age_group,
'{features.get('preferred_category', 'general')}' as preferred_category,
{features.get('purchase_frequency', 0)} as purchase_frequency,
{features.get('average_order_value', 0)} as avg_order_value
)
ORDER BY predicted_score DESC
LIMIT {num_recommendations}
"""
result = await self.client.query(ml_query)
return result.to_list()
async def detect_anomalies(self, transaction_data: Dict[str, Any]) -> Dict[str, Any]:
"""거래 이상 탐지"""
anomaly_query = f"""
SELECT
anomaly_score,
is_anomaly,
anomaly_reasons,
confidence
FROM ML.PREDICT(
MODEL anomaly_detector,
SELECT
{transaction_data['amount']} as amount,
'{transaction_data['merchant_category']}' as merchant_category,
{transaction_data['time_since_last_transaction']} as time_since_last,
'{transaction_data['location']}' as location,
{transaction_data['user_risk_score']} as user_risk_score
)
"""
result = await self.client.query(anomaly_query)
return result.to_dict()
async def analyze_sentiment(self, feedback_text: str) -> Dict[str, Any]:
"""고객 피드백 감정 분석"""
sentiment_query = f"""
SELECT
sentiment_label,
sentiment_score,
key_phrases,
emotion_categories
FROM ML.CHAT(
MODEL sentiment_analyzer,
'Analyze the sentiment and extract key insights from this customer feedback: "{feedback_text}"'
)
"""
result = await self.client.query(sentiment_query)
return result.to_dict()
async def run_ab_test(self, user_id: str, experiment_name: str) -> Dict[str, Any]:
"""A/B 테스트 모델 선택 및 추론"""
# 실험 설정 조회
experiment_query = f"""
SELECT
model_variant,
traffic_allocation,
experiment_status
FROM model_registry
WHERE experiment_name = '{experiment_name}'
AND experiment_status = 'active'
"""
experiments = await self.client.query(experiment_query)
# 사용자를 실험 그룹에 할당
user_hash = hash(user_id) % 100
cumulative_allocation = 0
for exp in experiments.to_list():
cumulative_allocation += exp['traffic_allocation']
if user_hash < cumulative_allocation:
selected_model = exp['model_variant']
break
else:
selected_model = 'control'
# 선택된 모델로 추론 수행
if selected_model == 'recommendation_model_v2':
return await self.predict_recommendations(user_id)
else:
return await self.predict_recommendations(user_id) # 기본 모델
# 사용 예제
async def main():
ml_platform = MLInferencePlatform()
# 실시간 추천
recommendations = await ml_platform.predict_recommendations("user_67890", 10)
print(f"Recommendations: {recommendations}")
# 이상 탐지
transaction = {
'amount': 5000,
'merchant_category': 'electronics',
'time_since_last_transaction': 120, # 분
'location': 'unusual_location',
'user_risk_score': 0.7
}
anomaly_result = await ml_platform.detect_anomalies(transaction)
print(f"Anomaly Detection: {anomaly_result}")
# 감정 분석
sentiment = await ml_platform.analyze_sentiment(
"The product quality is excellent but delivery was delayed."
)
print(f"Sentiment Analysis: {sentiment}")
if __name__ == "__main__":
asyncio.run(main())
3. 엔터프라이즈 RAG 시스템
문제 상황
- 분산된 문서와 지식 베이스
- 다양한 형태의 비구조화 데이터
- 컨텍스트를 고려한 정확한 정보 검색
- 실시간 지식 업데이트 및 벡터 인덱싱
SpiceAI RAG 시스템 구현
# spicepod.yml - Enterprise RAG 설정
version: v1beta1
kind: Spicepod
metadata:
name: enterprise-rag-system
datasets:
# 문서 저장소
- from: s3://company-docs/knowledge-base/
name: knowledge_documents
description: "Enterprise knowledge base documents"
params:
file_format: pdf
# 위키/컨플루언스
- from: https://company.atlassian.net/wiki/api/v2/
name: confluence_docs
description: "Confluence documentation"
# 이메일 아카이브
- from: postgresql://email-archive:5432/emails
name: email_archive
description: "Corporate email archive"
# 실시간 채팅/슬랙
- from: slack://workspace/channels
name: slack_messages
description: "Slack conversation history"
# 벡터 검색 및 임베딩 설정
models:
- from: huggingface://sentence-transformers/all-MiniLM-L6-v2
name: embedding_model
description: "Document embedding model"
- from: openai://gpt-4
name: generation_model
description: "Answer generation model"
- from: chromadb://localhost:8000/enterprise-vectors
name: vector_store
description: "Vector database for semantic search"
RAG 시스템 구현
# enterprise_rag_system.py
import spicepy
import asyncio
from typing import List, Dict, Any
import json
from datetime import datetime
class EnterpriseRAGSystem:
def __init__(self):
self.client = spicepy.Client()
async def index_documents(self, batch_size: int = 100):
"""문서 벡터화 및 인덱싱"""
# 새로운 문서 또는 업데이트된 문서 조회
new_docs_query = """
SELECT
doc_id,
title,
content,
document_type,
last_modified,
department,
access_level
FROM knowledge_documents
WHERE last_indexed IS NULL
OR last_modified > last_indexed
ORDER BY last_modified DESC
LIMIT {batch_size}
""".format(batch_size=batch_size)
docs = await self.client.query(new_docs_query)
for doc in docs.to_list():
# 문서 임베딩 생성
embedding_query = f"""
SELECT
doc_id,
ML.EMBED(
MODEL embedding_model,
CONCAT(title, ' ', content)
) as embedding
FROM (
SELECT
'{doc['doc_id']}' as doc_id,
'{doc['title']}' as title,
'{doc['content'][:2000]}' as content
) AS doc_data
"""
embedding_result = await self.client.query(embedding_query)
# 벡터 저장소에 저장
if embedding_result:
await self.store_vector(doc, embedding_result.to_dict()['embedding'])
print(f"Indexed {len(docs.to_list())} documents")
async def store_vector(self, document: Dict, embedding: List[float]):
"""벡터 저장소에 문서 임베딩 저장"""
store_query = f"""
INSERT INTO vector_store (
doc_id,
title,
content_preview,
embedding,
metadata
) VALUES (
'{document['doc_id']}',
'{document['title']}',
'{document['content'][:200]}...',
{embedding},
'{json.dumps({
'document_type': document['document_type'],
'department': document['department'],
'access_level': document['access_level'],
'last_modified': document['last_modified'].isoformat()
})}'
)
ON CONFLICT (doc_id)
DO UPDATE SET
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata,
updated_at = NOW()
"""
await self.client.query(store_query)
async def semantic_search(self, query: str, user_access_level: str, top_k: int = 5) -> List[Dict]:
"""의미적 검색 수행"""
# 쿼리 임베딩 생성
query_embedding_sql = f"""
SELECT ML.EMBED(
MODEL embedding_model,
'{query}'
) as query_embedding
"""
query_embedding_result = await self.client.query(query_embedding_sql)
query_embedding = query_embedding_result.to_dict()['query_embedding']
# 유사도 검색
search_query = f"""
WITH similarity_scores AS (
SELECT
doc_id,
title,
content_preview,
metadata,
COSINE_SIMILARITY(embedding, {query_embedding}) as similarity_score
FROM vector_store
WHERE JSON_EXTRACT(metadata, '$.access_level') <= '{user_access_level}'
)
SELECT
doc_id,
title,
content_preview,
similarity_score,
metadata
FROM similarity_scores
WHERE similarity_score > 0.7
ORDER BY similarity_score DESC
LIMIT {top_k}
"""
results = await self.client.query(search_query)
return results.to_list()
async def generate_answer(self, query: str, context_docs: List[Dict], user_id: str) -> Dict[str, Any]:
"""컨텍스트 기반 답변 생성"""
# 컨텍스트 구성
context = "\n\n".join([
f"Document: {doc['title']}\nContent: {doc['content_preview']}"
for doc in context_docs
])
# RAG 답변 생성
generation_query = f"""
SELECT
response,
confidence,
sources_used
FROM ML.CHAT(
MODEL generation_model,
'Context information:
{context}
Based on the above context, please answer the following question accurately and cite your sources:
Question: {query}
Please provide:
1. A comprehensive answer based on the context
2. Source citations
3. If the information is not available in the context, please state so clearly'
)
"""
answer_result = await self.client.query(generation_query)
answer_data = answer_result.to_dict()
# 사용 로그 기록
await self.log_interaction(user_id, query, answer_data, context_docs)
return {
'answer': answer_data.get('response', ''),
'confidence': answer_data.get('confidence', 0),
'sources': [doc['doc_id'] for doc in context_docs],
'timestamp': datetime.now().isoformat()
}
async def log_interaction(self, user_id: str, query: str, answer: Dict, sources: List[Dict]):
"""RAG 상호작용 로깅 및 분석"""
log_query = f"""
INSERT INTO rag_interactions (
user_id,
query,
answer,
sources_count,
confidence_score,
timestamp
) VALUES (
'{user_id}',
'{query}',
'{answer.get("response", "")}',
{len(sources)},
{answer.get("confidence", 0)},
NOW()
)
"""
await self.client.query(log_query)
async def get_rag_analytics(self, days: int = 7) -> Dict[str, Any]:
"""RAG 시스템 분석 및 성능 지표"""
analytics_query = f"""
WITH daily_stats AS (
SELECT
DATE(timestamp) as date,
COUNT(*) as total_queries,
AVG(confidence_score) as avg_confidence,
COUNT(CASE WHEN confidence_score > 0.8 THEN 1 END) as high_confidence_queries,
COUNT(DISTINCT user_id) as unique_users
FROM rag_interactions
WHERE timestamp >= NOW() - INTERVAL '{days} days'
GROUP BY DATE(timestamp)
),
popular_topics AS (
SELECT
query,
COUNT(*) as frequency
FROM rag_interactions
WHERE timestamp >= NOW() - INTERVAL '{days} days'
GROUP BY query
ORDER BY frequency DESC
LIMIT 10
)
SELECT
ds.date,
ds.total_queries,
ds.avg_confidence,
ds.high_confidence_queries,
ds.unique_users,
pt.query as popular_query,
pt.frequency
FROM daily_stats ds
CROSS JOIN popular_topics pt
ORDER BY ds.date DESC, pt.frequency DESC
"""
result = await self.client.query(analytics_query)
return result.to_dict()
async def ask_question(self, query: str, user_id: str, user_access_level: str = 'public') -> Dict[str, Any]:
"""전체 RAG 플로우 실행"""
# 1. 의미적 검색으로 관련 문서 찾기
relevant_docs = await self.semantic_search(query, user_access_level, top_k=5)
if not relevant_docs:
return {
'answer': 'I could not find relevant information to answer your question.',
'confidence': 0,
'sources': [],
'timestamp': datetime.now().isoformat()
}
# 2. 컨텍스트 기반 답변 생성
answer = await self.generate_answer(query, relevant_docs, user_id)
return answer
# 사용 예제
async def main():
rag_system = EnterpriseRAGSystem()
# 문서 인덱싱 (정기적으로 실행)
await rag_system.index_documents(50)
# 질문 답변
answer = await rag_system.ask_question(
query="What is our company's remote work policy?",
user_id="employee_123",
user_access_level="internal"
)
print(f"RAG Answer: {answer}")
# 분석 데이터
analytics = await rag_system.get_rag_analytics(30) # 최근 30일
print(f"RAG Analytics: {analytics}")
if __name__ == "__main__":
asyncio.run(main())
4. AI 에이전트 백엔드 플랫폼
문제 상황
- 복잡한 비즈니스 프로세스 자동화
- 다중 단계 의사결정 시스템
- 실시간 데이터 기반 액션 수행
- 에이전트 간 협업 및 워크플로우 관리
에이전트 시스템 구현
# ai_agent_platform.py
import spicepy
import asyncio
from typing import Dict, List, Any, Optional
from enum import Enum
import json
from datetime import datetime, timedelta
class AgentType(Enum):
CUSTOMER_SERVICE = "customer_service"
SALES_ASSISTANT = "sales_assistant"
OPERATIONS_MANAGER = "operations_manager"
SECURITY_MONITOR = "security_monitor"
class AIAgentPlatform:
def __init__(self):
self.client = spicepy.Client()
self.active_agents = {}
async def initialize_agent_workspace(self):
"""에이전트 작업 공간 초기화"""
workspace_setup = """
-- 에이전트 상태 테이블
CREATE TABLE IF NOT EXISTS agent_states (
agent_id VARCHAR PRIMARY KEY,
agent_type VARCHAR,
status VARCHAR,
current_task TEXT,
context JSON,
last_activity TIMESTAMP,
performance_metrics JSON
);
-- 에이전트 간 메시지 큐
CREATE TABLE IF NOT EXISTS agent_messages (
message_id UUID PRIMARY KEY,
from_agent VARCHAR,
to_agent VARCHAR,
message_type VARCHAR,
payload JSON,
status VARCHAR DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP
);
-- 태스크 관리
CREATE TABLE IF NOT EXISTS agent_tasks (
task_id UUID PRIMARY KEY,
assigned_agent VARCHAR,
task_type VARCHAR,
priority INTEGER,
status VARCHAR DEFAULT 'pending',
input_data JSON,
output_data JSON,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP
);
"""
await self.client.query(workspace_setup)
async def deploy_customer_service_agent(self, agent_id: str):
"""고객 서비스 에이전트 배포"""
agent_config = {
'agent_id': agent_id,
'agent_type': AgentType.CUSTOMER_SERVICE.value,
'capabilities': [
'handle_customer_inquiries',
'escalate_complex_issues',
'update_customer_records',
'generate_support_tickets'
],
'data_sources': [
'customer_database',
'support_knowledge_base',
'previous_interactions'
]
}
# 에이전트 등록
register_query = f"""
INSERT INTO agent_states (
agent_id, agent_type, status, context, last_activity
) VALUES (
'{agent_id}',
'{AgentType.CUSTOMER_SERVICE.value}',
'active',
'{json.dumps(agent_config)}',
NOW()
)
ON CONFLICT (agent_id) DO UPDATE SET
status = 'active',
context = EXCLUDED.context,
last_activity = NOW()
"""
await self.client.query(register_query)
# 에이전트 워크플로우 설정
workflow_query = f"""
WITH agent_workflow AS (
SELECT
'{agent_id}' as agent_id,
'customer_inquiry' as trigger_event,
JSON_BUILD_OBJECT(
'steps', JSON_BUILD_ARRAY(
JSON_BUILD_OBJECT('action', 'analyze_inquiry', 'timeout', 30),
JSON_BUILD_OBJECT('action', 'search_knowledge_base', 'timeout', 60),
JSON_BUILD_OBJECT('action', 'generate_response', 'timeout', 45),
JSON_BUILD_OBJECT('action', 'update_customer_record', 'timeout', 15)
)
) as workflow_config
)
SELECT agent_id, workflow_config FROM agent_workflow
"""
return await self.client.query(workflow_query)
async def handle_customer_inquiry(self, agent_id: str, customer_id: str, inquiry: str) -> Dict[str, Any]:
"""고객 문의 처리"""
# 1. 고객 컨텍스트 수집
context_query = f"""
WITH customer_profile AS (
SELECT
customer_id,
name,
tier,
total_purchases,
last_interaction
FROM customers
WHERE customer_id = '{customer_id}'
),
recent_interactions AS (
SELECT
interaction_type,
description,
resolution_status,
timestamp
FROM customer_interactions
WHERE customer_id = '{customer_id}'
AND timestamp >= NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC
LIMIT 5
)
SELECT
cp.*,
JSON_AGG(ri.*) as recent_interactions
FROM customer_profile cp
LEFT JOIN recent_interactions ri ON TRUE
GROUP BY cp.customer_id, cp.name, cp.tier, cp.total_purchases, cp.last_interaction
"""
customer_context = await self.client.query(context_query)
# 2. 지식 베이스 검색
knowledge_search = f"""
SELECT
solution_id,
problem_description,
solution_steps,
success_rate,
category
FROM support_knowledge_base
WHERE SIMILARITY(problem_description, '{inquiry}') > 0.7
ORDER BY success_rate DESC, SIMILARITY(problem_description, '{inquiry}') DESC
LIMIT 3
"""
knowledge_results = await self.client.query(knowledge_search)
# 3. AI 기반 응답 생성
response_generation = f"""
SELECT
response,
confidence,
escalation_needed,
follow_up_actions
FROM ML.CHAT(
MODEL customer_service_model,
'Customer Context: {customer_context.to_dict() if customer_context else "{}"}
Knowledge Base Results: {knowledge_results.to_list() if knowledge_results else []}
Customer Inquiry: {inquiry}
Please provide:
1. A helpful and empathetic response
2. Confidence level (0-1)
3. Whether escalation is needed
4. Any follow-up actions required
Format as JSON with keys: response, confidence, escalation_needed, follow_up_actions'
)
"""
ai_response = await self.client.query(response_generation)
response_data = json.loads(ai_response.to_dict().get('response', '{}'))
# 4. 상호작용 기록
interaction_log = f"""
INSERT INTO customer_interactions (
customer_id,
agent_id,
interaction_type,
inquiry,
response,
confidence_score,
escalation_needed,
timestamp
) VALUES (
'{customer_id}',
'{agent_id}',
'chat_inquiry',
'{inquiry}',
'{response_data.get("response", "")}',
{response_data.get("confidence", 0)},
{response_data.get("escalation_needed", False)},
NOW()
)
"""
await self.client.query(interaction_log)
# 5. 에스컬레이션 필요 시 처리
if response_data.get('escalation_needed', False):
await self.escalate_to_human_agent(customer_id, inquiry, response_data)
return {
'response': response_data.get('response', ''),
'confidence': response_data.get('confidence', 0),
'escalation_needed': response_data.get('escalation_needed', False),
'agent_id': agent_id,
'timestamp': datetime.now().isoformat()
}
async def deploy_sales_assistant_agent(self, agent_id: str):
"""세일즈 어시스턴트 에이전트 배포"""
sales_agent_config = {
'agent_id': agent_id,
'agent_type': AgentType.SALES_ASSISTANT.value,
'capabilities': [
'lead_qualification',
'product_recommendations',
'pricing_analysis',
'proposal_generation',
'follow_up_scheduling'
]
}
# 세일즈 에이전트 등록 및 워크플로우 설정
register_query = f"""
INSERT INTO agent_states (
agent_id, agent_type, status, context, last_activity
) VALUES (
'{agent_id}',
'{AgentType.SALES_ASSISTANT.value}',
'active',
'{json.dumps(sales_agent_config)}',
NOW()
)
"""
await self.client.query(register_query)
async def qualify_lead(self, agent_id: str, lead_data: Dict[str, Any]) -> Dict[str, Any]:
"""리드 자격 평가"""
# 리드 스코어링
scoring_query = f"""
WITH lead_analysis AS (
SELECT
'{lead_data.get("company_size", "unknown")}' as company_size,
'{lead_data.get("industry", "unknown")}' as industry,
'{lead_data.get("budget_range", "unknown")}' as budget_range,
'{lead_data.get("urgency", "unknown")}' as urgency,
'{lead_data.get("decision_maker", "unknown")}' as decision_maker
),
scoring AS (
SELECT
CASE
WHEN company_size IN ('enterprise', 'large') THEN 30
WHEN company_size = 'medium' THEN 20
WHEN company_size = 'small' THEN 10
ELSE 5
END as size_score,
CASE
WHEN industry IN ('technology', 'finance', 'healthcare') THEN 25
WHEN industry IN ('retail', 'manufacturing') THEN 15
ELSE 10
END as industry_score,
CASE
WHEN budget_range = 'high' THEN 25
WHEN budget_range = 'medium' THEN 15
WHEN budget_range = 'low' THEN 5
ELSE 0
END as budget_score,
CASE
WHEN urgency = 'immediate' THEN 20
WHEN urgency = 'soon' THEN 15
WHEN urgency = 'future' THEN 5
ELSE 0
END as urgency_score
FROM lead_analysis
)
SELECT
size_score + industry_score + budget_score + urgency_score as total_score,
CASE
WHEN size_score + industry_score + budget_score + urgency_score >= 80 THEN 'hot'
WHEN size_score + industry_score + budget_score + urgency_score >= 60 THEN 'warm'
WHEN size_score + industry_score + budget_score + urgency_score >= 40 THEN 'cold'
ELSE 'unqualified'
END as lead_status
FROM scoring
"""
score_result = await self.client.query(scoring_query)
# AI 기반 리드 분석
analysis_query = f"""
SELECT
analysis,
recommendations,
next_actions
FROM ML.CHAT(
MODEL sales_assistant_model,
'Lead Information: {json.dumps(lead_data)}
Lead Score: {score_result.to_dict() if score_result else {}}
Please provide:
1. Detailed lead analysis
2. Specific recommendations for engagement
3. Next action steps
4. Estimated conversion probability
Format as JSON with keys: analysis, recommendations, next_actions, conversion_probability'
)
"""
ai_analysis = await self.client.query(analysis_query)
analysis_data = json.loads(ai_analysis.to_dict().get('analysis', '{}'))
return {
'lead_score': score_result.to_dict() if score_result else {},
'ai_analysis': analysis_data,
'agent_id': agent_id,
'timestamp': datetime.now().isoformat()
}
async def deploy_operations_manager_agent(self, agent_id: str):
"""운영 관리 에이전트 배포"""
ops_config = {
'agent_id': agent_id,
'agent_type': AgentType.OPERATIONS_MANAGER.value,
'capabilities': [
'resource_optimization',
'anomaly_detection',
'capacity_planning',
'incident_response',
'performance_monitoring'
]
}
# 운영 에이전트 등록
await self.client.query(f"""
INSERT INTO agent_states (
agent_id, agent_type, status, context, last_activity
) VALUES (
'{agent_id}',
'{AgentType.OPERATIONS_MANAGER.value}',
'active',
'{json.dumps(ops_config)}',
NOW()
)
""")
async def monitor_system_health(self, agent_id: str) -> Dict[str, Any]:
"""시스템 상태 모니터링 및 최적화"""
# 시스템 메트릭 수집
metrics_query = """
WITH system_metrics AS (
SELECT
service_name,
AVG(cpu_usage) as avg_cpu,
AVG(memory_usage) as avg_memory,
COUNT(CASE WHEN status = 'error' THEN 1 END) as error_count,
COUNT(*) as total_requests,
AVG(response_time) as avg_response_time
FROM system_metrics
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY service_name
),
anomalies AS (
SELECT
service_name,
CASE
WHEN avg_cpu > 80 THEN 'HIGH_CPU'
WHEN avg_memory > 85 THEN 'HIGH_MEMORY'
WHEN avg_response_time > 5000 THEN 'SLOW_RESPONSE'
WHEN error_count > total_requests * 0.05 THEN 'HIGH_ERROR_RATE'
ELSE 'NORMAL'
END as anomaly_type
FROM system_metrics
)
SELECT
sm.*,
a.anomaly_type
FROM system_metrics sm
JOIN anomalies a ON sm.service_name = a.service_name
WHERE a.anomaly_type != 'NORMAL'
"""
metrics_result = await self.client.query(metrics_query)
# AI 기반 운영 최적화 제안
optimization_query = f"""
SELECT
optimization_actions,
risk_assessment,
resource_recommendations
FROM ML.CHAT(
MODEL operations_model,
'System Metrics: {metrics_result.to_list() if metrics_result else []}
Based on the system metrics, please provide:
1. Immediate optimization actions needed
2. Risk assessment for current anomalies
3. Resource scaling recommendations
4. Preventive measures
Format as JSON with keys: optimization_actions, risk_assessment, resource_recommendations'
)
"""
optimization_result = await self.client.query(optimization_query)
optimization_data = json.loads(optimization_result.to_dict().get('optimization_actions', '{}'))
# 자동 최적화 액션 실행
if optimization_data.get('immediate_actions'):
await self.execute_optimization_actions(optimization_data['immediate_actions'])
return {
'system_status': metrics_result.to_list() if metrics_result else [],
'optimizations': optimization_data,
'agent_id': agent_id,
'timestamp': datetime.now().isoformat()
}
async def execute_optimization_actions(self, actions: List[str]):
"""최적화 액션 자동 실행"""
for action in actions:
action_query = f"""
INSERT INTO optimization_actions (
action_type,
description,
status,
executed_at
) VALUES (
'auto_optimization',
'{action}',
'executed',
NOW()
)
"""
await self.client.query(action_query)
async def agent_collaboration_workflow(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
"""에이전트 간 협업 워크플로우"""
# 태스크 유형에 따른 에이전트 할당
task_assignment_query = f"""
WITH task_requirements AS (
SELECT
'{task_data.get("type", "")}' as task_type,
'{task_data.get("priority", "medium")}' as priority,
'{json.dumps(task_data)}' as task_data
),
agent_capabilities AS (
SELECT
agent_id,
agent_type,
JSON_EXTRACT(context, '$.capabilities') as capabilities
FROM agent_states
WHERE status = 'active'
)
SELECT
ac.agent_id,
ac.agent_type,
tr.task_type,
tr.priority
FROM task_requirements tr
CROSS JOIN agent_capabilities ac
WHERE JSON_SEARCH(ac.capabilities, 'one', tr.task_type) IS NOT NULL
ORDER BY
CASE tr.priority
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
END
LIMIT 1
"""
assignment_result = await self.client.query(task_assignment_query)
if assignment_result:
assigned_agent = assignment_result.to_dict()
# 태스크 할당 및 실행
task_execution_query = f"""
INSERT INTO agent_tasks (
task_id,
assigned_agent,
task_type,
priority,
input_data,
status,
created_at
) VALUES (
'{task_data.get("task_id", "")}',
'{assigned_agent["agent_id"]}',
'{task_data.get("type", "")}',
CASE '{task_data.get("priority", "medium")}'
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
END,
'{json.dumps(task_data)}',
'assigned',
NOW()
)
"""
await self.client.query(task_execution_query)
return {
'assigned_agent': assigned_agent,
'task_status': 'assigned',
'estimated_completion': datetime.now() + timedelta(minutes=30)
}
return {'error': 'No suitable agent available'}
# 사용 예제
async def main():
agent_platform = AIAgentPlatform()
# 에이전트 플랫폼 초기화
await agent_platform.initialize_agent_workspace()
# 고객 서비스 에이전트 배포
await agent_platform.deploy_customer_service_agent("cs_agent_001")
# 고객 문의 처리
customer_response = await agent_platform.handle_customer_inquiry(
agent_id="cs_agent_001",
customer_id="customer_12345",
inquiry="I'm having trouble with my recent order delivery."
)
print(f"Customer Service Response: {customer_response}")
# 세일즈 에이전트 배포 및 리드 평가
await agent_platform.deploy_sales_assistant_agent("sales_agent_001")
lead_qualification = await agent_platform.qualify_lead(
agent_id="sales_agent_001",
lead_data={
"company_size": "enterprise",
"industry": "technology",
"budget_range": "high",
"urgency": "immediate",
"decision_maker": "yes"
}
)
print(f"Lead Qualification: {lead_qualification}")
# 운영 관리 에이전트 배포 및 시스템 모니터링
await agent_platform.deploy_operations_manager_agent("ops_agent_001")
system_health = await agent_platform.monitor_system_health("ops_agent_001")
print(f"System Health: {system_health}")
if __name__ == "__main__":
asyncio.run(main())
🚀 설치 및 배포 가이드
로컬 개발 환경
1. Docker를 통한 빠른 시작
# SpiceAI Docker 이미지 실행
docker run -p 3000:3000 -p 50051:50051 \
-v $(pwd)/spicepod.yml:/app/spicepod.yml \
spiceai/spiceai:latest
# 상태 확인
curl http://localhost:3000/health
# SQL 쿼리 테스트
curl -X POST http://localhost:3000/v1/sql \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT 1 as test"}'
2. 로컬 바이너리 설치 (macOS)
# SpiceAI CLI 설치
curl https://install.spiceai.org | /bin/bash
# 프로젝트 초기화
mkdir my-spice-project
cd my-spice-project
spice init
# 개발 서버 시작
spice run
Private 클라우드 Kubernetes 배포
1. Helm을 통한 배포
# SpiceAI Helm 리포지토리 추가
helm repo add spiceai https://helm.spiceai.org
helm repo update
# 네임스페이스 생성
kubectl create namespace spiceai-platform
# 커스텀 values.yaml 생성
cat > values.yaml << 'EOF'
# Production 설정
replicaCount: 3
image:
repository: spiceai/spiceai
tag: "1.4.0"
pullPolicy: IfNotPresent
# 리소스 제한
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "1"
memory: "2Gi"
# 영구 저장소 설정
stateful:
enabled: true
storageClass: "fast-ssd"
size: "100Gi"
mountPath: "/data"
# 서비스 설정
service:
type: LoadBalancer
annotations:
service.beta.kubernetes.io/aws-load-balancer-internal: "true"
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
# 모니터링 활성화
monitoring:
podMonitor:
enabled: true
additionalLabels:
release: prometheus
# 환경 변수
additionalEnv:
- name: SPICED_LOG
value: "INFO"
- name: SPICE_SECRET_DATABASE_URL
valueFrom:
secretKeyRef:
name: spice-secrets
key: database-url
- name: SPICE_SECRET_OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: spice-secrets
key: openai-api-key
# SpicePod 설정
spicepod:
name: enterprise-ai-platform
version: v1
kind: Spicepod
datasets:
- from: postgresql://postgres-primary:5432/enterprise_data
name: enterprise_data
description: "Main enterprise database"
acceleration:
enabled: true
engine: duckdb
mode: file
params:
duckdb_file: "/data/enterprise_data.db"
- from: s3://company-datalake/analytics/
name: analytics_data
description: "Analytics data lake"
params:
file_format: parquet
acceleration:
enabled: true
engine: arrow
models:
- from: openai://gpt-4
name: enterprise_llm
description: "Enterprise LLM for various AI tasks"
- from: huggingface://sentence-transformers/all-MiniLM-L6-v2
name: embedding_model
description: "Text embedding model"
EOF
# Secrets 생성
kubectl create secret generic spice-secrets \
--from-literal=database-url="postgresql://user:password@postgres:5432/db" \
--from-literal=openai-api-key="your-openai-api-key" \
--namespace spiceai-platform
# 배포
helm upgrade --install spiceai-platform spiceai/spiceai \
--namespace spiceai-platform \
--values values.yaml
# 배포 상태 확인
kubectl get pods -n spiceai-platform
kubectl get svc -n spiceai-platform
2. 고가용성 설정
# ha-spiceai-values.yaml
replicaCount: 5
# Pod Disruption Budget
podDisruptionBudget:
enabled: true
minAvailable: 2
# Anti-affinity for high availability
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- spiceai
topologyKey: kubernetes.io/hostname
# Horizontal Pod Autoscaler
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
targetMemoryUtilizationPercentage: 80
# Load Balancer 설정
service:
type: LoadBalancer
annotations:
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: "tcp"
service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled: "true"
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
service.beta.kubernetes.io/aws-load-balancer-internal: "true"
# 멀티 가용영역 배포
nodeSelector:
node.kubernetes.io/instance-type: "c5.2xlarge"
tolerations:
- key: "dedicated"
operator: "Equal"
value: "spiceai"
effect: "NoSchedule"
3. 모니터링 및 관찰 가능성
# monitoring-setup.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: spiceai-grafana-dashboard
namespace: spiceai-platform
data:
dashboard.json: |
{
"dashboard": {
"id": null,
"title": "SpiceAI Enterprise Platform",
"panels": [
{
"title": "Query Performance",
"type": "graph",
"targets": [
{
"expr": "spiceai_query_duration_seconds",
"legendFormat": "Query Duration"
}
]
},
{
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "spiceai_memory_usage_bytes",
"legendFormat": "Memory Usage"
}
]
},
{
"title": "Active Connections",
"type": "singlestat",
"targets": [
{
"expr": "spiceai_active_connections",
"legendFormat": "Connections"
}
]
}
]
}
}
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: spiceai-metrics
namespace: spiceai-platform
spec:
selector:
matchLabels:
app.kubernetes.io/name: spiceai
endpoints:
- port: metrics
interval: 30s
path: /metrics
보안 설정
1. 네트워크 정책
# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: spiceai-network-policy
namespace: spiceai-platform
spec:
podSelector:
matchLabels:
app.kubernetes.io/name: spiceai
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ai-applications
- podSelector:
matchLabels:
app: allowed-client
ports:
- protocol: TCP
port: 3000
- protocol: TCP
port: 50051
egress:
- to:
- namespaceSelector:
matchLabels:
name: databases
ports:
- protocol: TCP
port: 5432
- to: []
ports:
- protocol: TCP
port: 443 # HTTPS
- protocol: TCP
port: 53 # DNS
- protocol: UDP
port: 53 # DNS
2. RBAC 설정
# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: spiceai-service-account
namespace: spiceai-platform
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spiceai-cluster-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: spiceai-cluster-role-binding
subjects:
- kind: ServiceAccount
name: spiceai-service-account
namespace: spiceai-platform
roleRef:
kind: ClusterRole
name: spiceai-cluster-role
apiGroup: rbac.authorization.k8s.io
📊 성능 최적화 및 운영
1. 데이터 가속화 전략
Arrow 인메모리 가속화
# 고성능 실시간 쿼리용
datasets:
- from: postgresql://analytics-db:5432/real_time_data
name: real_time_metrics
acceleration:
enabled: true
engine: arrow
mode: memory
refresh_check_interval: 1m
refresh_mode: full
DuckDB 파일 기반 가속화
# 대용량 분석 쿼리용
datasets:
- from: s3://data-lake/historical-data/
name: historical_analytics
acceleration:
enabled: true
engine: duckdb
mode: file
params:
duckdb_file: "/data/historical.db"
refresh_check_interval: 1h
refresh_mode: incremental
2. 클러스터 확장 전략
수평 확장 설정
# CPU 기반 자동 확장
kubectl autoscale deployment spiceai-platform \
--cpu-percent=70 \
--min=3 \
--max=20 \
--namespace spiceai-platform
# 커스텀 메트릭 기반 확장
cat > hpa-custom.yaml << 'EOF'
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: spiceai-hpa-custom
namespace: spiceai-platform
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: spiceai-platform
minReplicas: 3
maxReplicas: 50
metrics:
- type: Pods
pods:
metric:
name: spiceai_query_queue_length
target:
type: AverageValue
averageValue: "10"
- type: Pods
pods:
metric:
name: spiceai_memory_utilization
target:
type: AverageValue
averageValue: "80"
EOF
kubectl apply -f hpa-custom.yaml
3. 백업 및 재해 복구
데이터 백업 전략
#!/bin/bash
# spiceai-backup.sh
BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_PATH="/backups/spiceai-${BACKUP_DATE}"
# 1. 설정 백업
kubectl get configmap -n spiceai-platform -o yaml > "${BACKUP_PATH}/configmaps.yaml"
kubectl get secret -n spiceai-platform -o yaml > "${BACKUP_PATH}/secrets.yaml"
# 2. 데이터 백업 (PVC 스냅샷)
kubectl get pvc -n spiceai-platform -o json | \
jq -r '.items[] | select(.metadata.name | contains("spiceai")) | .metadata.name' | \
while read pvc_name; do
# 볼륨 스냅샷 생성
cat > "${BACKUP_PATH}/${pvc_name}-snapshot.yaml" << EOF
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshot
metadata:
name: ${pvc_name}-snapshot-${BACKUP_DATE}
namespace: spiceai-platform
spec:
source:
persistentVolumeClaimName: ${pvc_name}
volumeSnapshotClassName: csi-aws-vsc
EOF
kubectl apply -f "${BACKUP_PATH}/${pvc_name}-snapshot.yaml"
done
# 3. 메타데이터 백업
kubectl exec -n spiceai-platform deployment/spiceai-platform -- \
pg_dump "$DATABASE_URL" > "${BACKUP_PATH}/metadata-backup.sql"
echo "Backup completed: ${BACKUP_PATH}"
재해 복구 계획
#!/bin/bash
# spiceai-restore.sh
RESTORE_PATH="$1"
if [ -z "$RESTORE_PATH" ]; then
echo "Usage: $0 <backup-path>"
exit 1
fi
# 1. 네임스페이스 재생성
kubectl create namespace spiceai-platform
# 2. 설정 복원
kubectl apply -f "${RESTORE_PATH}/configmaps.yaml"
kubectl apply -f "${RESTORE_PATH}/secrets.yaml"
# 3. 볼륨 복원
for snapshot_file in "${RESTORE_PATH}"/*-snapshot.yaml; do
kubectl apply -f "$snapshot_file"
done
# 4. 애플리케이션 재배포
helm upgrade --install spiceai-platform spiceai/spiceai \
--namespace spiceai-platform \
--values values.yaml
echo "Restore completed from: ${RESTORE_PATH}"
zshrc 별칭 설정
# ~/.zshrc에 추가할 SpiceAI 관련 별칭들
# SpiceAI 관련 디렉토리
export SPICEAI_HOME="$HOME/.spiceai"
export SPICEAI_PROJECTS_DIR="$HOME/spiceai-projects"
# 기본 별칭
alias spice="spice"
alias spice-init="spice init"
alias spice-run="spice run"
alias spice-build="spice build"
alias spice-status="spice status"
# Docker 관련
alias spice-docker="docker run -p 3000:3000 -p 50051:50051 -v \$(pwd):/app spiceai/spiceai:latest"
alias spice-docker-dev="docker run -it -p 3000:3000 -p 50051:50051 -v \$(pwd):/app spiceai/spiceai:latest /bin/bash"
# Kubernetes 관련
alias k-spice="kubectl -n spiceai-platform"
alias spice-logs="kubectl logs -n spiceai-platform -l app.kubernetes.io/name=spiceai"
alias spice-pods="kubectl get pods -n spiceai-platform"
alias spice-svc="kubectl get svc -n spiceai-platform"
# SQL 쿼리 테스트
function spice-query() {
if [ -z "$1" ]; then
echo "Usage: spice-query 'SQL_QUERY'"
return 1
fi
curl -X POST http://localhost:3000/v1/sql \
-H "Content-Type: application/json" \
-d "{\"sql\": \"$1\"}" | jq .
}
# SpicePod 검증
alias spice-validate="spice validate"
alias spice-fmt="spice fmt"
# 모니터링
alias spice-metrics="curl -s http://localhost:3000/metrics"
alias spice-health="curl -s http://localhost:3000/health | jq ."
# 프로젝트 관리
function spice-new() {
local project_name="$1"
if [ -z "$project_name" ]; then
echo "Usage: spice-new <project-name>"
return 1
fi
mkdir -p "$SPICEAI_PROJECTS_DIR/$project_name"
cd "$SPICEAI_PROJECTS_DIR/$project_name"
spice init
echo "✅ SpiceAI 프로젝트 '$project_name' 생성 완료"
echo "📁 경로: $SPICEAI_PROJECTS_DIR/$project_name"
}
# 개발 환경 설정
function spice-dev-setup() {
echo "🚀 SpiceAI 개발 환경 설정 중..."
# 필요한 디렉토리 생성
mkdir -p "$SPICEAI_PROJECTS_DIR"
mkdir -p "$SPICEAI_HOME"
# Docker 이미지 pull
docker pull spiceai/spiceai:latest
# 기본 프로젝트 생성
spice-new "sample-project"
echo "✅ SpiceAI 개발 환경 설정 완료"
}
# 백업 관련
function spice-backup() {
local backup_name="spiceai-backup-$(date +%Y%m%d-%H%M%S)"
mkdir -p "$HOME/spiceai-backups/$backup_name"
# 프로젝트 백업
cp -r "$SPICEAI_PROJECTS_DIR" "$HOME/spiceai-backups/$backup_name/projects"
# 설정 백업
if [ -d "$SPICEAI_HOME" ]; then
cp -r "$SPICEAI_HOME" "$HOME/spiceai-backups/$backup_name/config"
fi
echo "✅ 백업 완료: $HOME/spiceai-backups/$backup_name"
}
# Helm 관련
alias helm-spice="helm -n spiceai-platform"
alias spice-install="helm upgrade --install spiceai-platform spiceai/spiceai --namespace spiceai-platform"
alias spice-uninstall="helm uninstall spiceai-platform --namespace spiceai-platform"
# 로그 및 디버깅
function spice-debug() {
echo "=== SpiceAI 디버그 정보 ==="
echo "SpiceAI 버전:"
spice --version 2>/dev/null || echo "SpiceAI CLI not installed"
echo -e "\nDocker 이미지:"
docker images spiceai/spiceai 2>/dev/null || echo "No SpiceAI Docker images found"
echo -e "\nKubernetes 상태:"
k-spice get pods 2>/dev/null || echo "Kubernetes not configured or no pods"
echo -e "\n로컬 서버 상태:"
spice-health 2>/dev/null || echo "Local SpiceAI server not running"
}
# 성능 테스트
function spice-benchmark() {
local query="${1:-SELECT 1 as test}"
local iterations="${2:-100}"
echo "🔥 SpiceAI 성능 벤치마크 시작..."
echo "쿼리: $query"
echo "반복 횟수: $iterations"
start_time=$(date +%s%3N)
for i in $(seq 1 $iterations); do
spice-query "$query" > /dev/null 2>&1
if [ $((i % 10)) -eq 0 ]; then
echo "Progress: $i/$iterations"
fi
done
end_time=$(date +%s%3N)
total_time=$((end_time - start_time))
avg_time=$((total_time / iterations))
echo "✅ 벤치마크 완료"
echo "총 시간: ${total_time}ms"
echo "평균 응답 시간: ${avg_time}ms"
echo "초당 쿼리 수: $((1000 / avg_time))"
}
# 도움말
function spice-help() {
echo "🌶️ SpiceAI 별칭 도움말"
echo ""
echo "기본 명령어:"
echo " spice-init - 새 SpiceAI 프로젝트 초기화"
echo " spice-run - SpiceAI 서버 실행"
echo " spice-query - SQL 쿼리 실행"
echo ""
echo "프로젝트 관리:"
echo " spice-new <name> - 새 프로젝트 생성"
echo " spice-dev-setup - 개발 환경 초기 설정"
echo " spice-backup - 프로젝트 백업"
echo ""
echo "Docker:"
echo " spice-docker - Docker로 SpiceAI 실행"
echo " spice-docker-dev - Docker 개발 모드"
echo ""
echo "Kubernetes:"
echo " k-spice - kubectl with spiceai namespace"
echo " spice-install - Helm으로 설치"
echo " spice-logs - Pod 로그 확인"
echo ""
echo "모니터링:"
echo " spice-health - 서버 상태 확인"
echo " spice-metrics - 메트릭 확인"
echo " spice-debug - 디버그 정보"
echo " spice-benchmark - 성능 테스트"
}
개발환경 정보
# 테스트 환경 정보
echo "=== SpiceAI 개발환경 정보 ==="
echo "날짜: $(date)"
echo "OS: $(uname -a)"
echo "Docker: $(docker --version 2>/dev/null || echo 'Docker not installed')"
echo "Kubernetes: $(kubectl version --client --short 2>/dev/null || echo 'kubectl not installed')"
echo "Helm: $(helm version --short 2>/dev/null || echo 'Helm not installed')"
echo "SpiceAI CLI: $(spice --version 2>/dev/null || echo 'SpiceAI CLI not installed')"
echo "사용 가능 메모리: $(free -h 2>/dev/null | grep Mem || vm_stat | head -5)"
echo "디스크 공간: $(df -h . | tail -1)"
검증된 환경
이 가이드는 다음 환경에서 테스트되었습니다:
- macOS Sonoma (Apple M4 Pro, 48GB RAM)
- Ubuntu 22.04 LTS (Intel/AMD x86_64)
- Kubernetes 1.28+
- Docker 24.0+
- Helm 3.12+
- SpiceAI 1.4.0+
결론
SpiceAI는 Apache-2.0 라이선스 하에서 완전히 자유로운 상업적 사용이 가능한 혁신적인 AI 인프라 플랫폼입니다. Private 클라우드 환경의 AI 플랫폼 팀에게 다음과 같은 강력한 가치를 제공합니다:
🎯 핵심 가치 제안
- 통합 데이터 플랫폼: 분산된 데이터 소스를 SQL로 통합 쿼리
- 실시간 AI 추론: 고성능 ML 모델 서빙 및 추론
- 엔터프라이즈 RAG: 지능형 검색 및 생성 시스템
- AI 에이전트 백엔드: 복잡한 비즈니스 프로세스 자동화
- 확장 가능한 아키텍처: Kubernetes 네이티브 설계
🚀 Private 클라우드 팀 권장 도입 전략
- POC부터 시작: 단일 데이터 소스 연동으로 개념 증명
- 점진적 확장: 추가 데이터 소스와 AI 모델 통합
- 운영 최적화: 모니터링, 백업, 고가용성 구성
- 전사 확산: 검증된 패턴을 다른 팀으로 확산
💡 실무 활용 시나리오
- 데이터 팀: 실시간 데이터 페더레이션 및 분석
- AI/ML 팀: 모델 서빙 및 추론 인프라
- 제품 팀: RAG 기반 지능형 서비스 구축
- 운영 팀: AI 기반 시스템 모니터링 및 최적화
SpiceAI를 통해 Private 클라우드 환경에서 차세대 AI 플랫폼을 구축하고, 데이터 중심의 지능형 서비스를 효율적으로 제공할 수 있습니다. 🌶️