기업용 RAG 시스템을 위한 산업별 공개 데이터셋 완벽 가이드: 은행부터 증권까지
⏱️ 예상 읽기 시간: 20분
서론
기업용 RAG(Retrieval-Augmented Generation) 시스템 구축에서 가장 중요한 것은 품질 높은 도메인 특화 데이터입니다. 이 가이드에서는 7개 주요 산업 분야에서 실제 활용 가능한 검증된 공개 데이터셋과 실제 구현 방법을 제시합니다.
이 가이드의 특징
- ✅ 무료 접근: 모든 데이터셋이 무료로 이용 가능
- ✅ 구조화된 스키마: RAG 시스템 설계가 용이한 명확한 구조
- ✅ PoC 적합: 기업 고객 시연에 바로 활용 가능
- ✅ 실제 코드: 각 데이터셋별 구현 예시 제공
- ✅ 프로덕션 검증: 실제 운영 환경에서 검증된 데이터셋만 선별
산업별 데이터셋 요약
분야 | 추천 데이터셋 | 데이터 규모 | 주요 활용 사례 |
---|---|---|---|
은행 | FDIC Call Report | 5,000+ 기관, 분기별 | 재무제표 질의응답, 위험도 분석 |
보험 | NAIC InsData | 3,000+ 보험사 | 보험금 지급 분석, 민원 처리 |
회계 | SEC XBRL | 8,000+ 상장사 | 재무 항목 검색, 공시 분석 |
법률 | CourtListener | 400만+ 판례 | 판례 검색, 법률 자문 |
의료 | MIMIC-IV | 30만+ 환자 | 임상 질의응답, 진단 지원 |
자동차 | NHTSA API | 수십만 건 리콜 | 안전도 조회, 리콜 알림 |
증권 | NASDAQ Data | 15년+ 주가 | 투자 분석, 트렌드 예측 |
1. 은행 및 신용 분석
1-1. FDIC Call Report
개요: 미국 FDIC 감독 하의 모든 은행이 분기별로 제출하는 재무제표 데이터
데이터 구조:
{
"bank_name": "First National Bank",
"cert_id": "123456",
"quarter": "2024Q3",
"total_assets": 1234567890,
"tier1_capital_ratio": 12.5,
"net_income": 45678901,
"loan_loss_provisions": 2345678
}
RAG 활용 예시:
# FDIC 데이터 RAG 구성 예시
from langchain.document_loaders import CSVLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
class FDICRAGSystem:
def __init__(self):
self.loader = CSVLoader("fdic_call_reports.csv")
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=["\n", ","] # CSV 데이터 최적화
)
def create_bank_profile_chunks(self, bank_data):
"""은행별 프로필 청크 생성"""
profile_text = f"""
은행명: {bank_data['bank_name']}
총자산: ${bank_data['total_assets']:,}
Tier1 자본비율: {bank_data['tier1_capital_ratio']}%
순이익: ${bank_data['net_income']:,}
대손충당금: ${bank_data['loan_loss_provisions']:,}
분기: {bank_data['quarter']}
"""
return self.text_splitter.split_text(profile_text)
# 질의 예시
def query_bank_performance(query: str):
"""
예시 질의:
- "2024년 3분기 Tier1 자본비율이 가장 높은 은행은?"
- "First National Bank의 최근 3개 분기 순이익 추이는?"
"""
pass
활용 시나리오:
- 📊 리스크 분석: “자본비율이 10% 미만인 은행들의 공통점은?”
- 📈 성과 비교: “유사 규모 은행들과 비교한 우리 은행의 위치는?”
- 🔍 규제 준수: “Basel III 기준을 충족하지 못하는 항목들은?”
데이터 접근: FDIC Call Reports
1-2. FRED API (Federal Reserve Economic Data)
개요: 미국 연방준비제도의 경제 지표 데이터베이스 (80만+ 시계열)
주요 지표:
- 금리 (Federal Funds Rate, 국채 수익률)
- 통화량 (M1, M2, M3)
- 고용 지표 (실업률, 비농업 취업자 수)
- 인플레이션 (CPI, PCE)
RAG 구현 예시:
import pandas as pd
from fredapi import Fred
from datetime import datetime, timedelta
class FREDEconomicRAG:
def __init__(self, api_key):
self.fred = Fred(api_key=api_key)
self.indicators = {
'FEDFUNDS': 'Federal Funds Rate',
'UNRATE': 'Unemployment Rate',
'CPIAUCSL': 'Consumer Price Index',
'GDP': 'Gross Domestic Product'
}
def create_economic_context(self, indicator_code, months_back=12):
"""경제 지표 기반 컨텍스트 생성"""
end_date = datetime.now()
start_date = end_date - timedelta(days=months_back*30)
data = self.fred.get_series(
indicator_code,
start_date,
end_date
)
context = f"""
지표명: {self.indicators.get(indicator_code, indicator_code)}
최신값: {data.iloc[-1]:.2f}
1년 전 대비: {((data.iloc[-1] / data.iloc[0]) - 1) * 100:.2f}%
최고값: {data.max():.2f} (날짜: {data.idxmax().strftime('%Y-%m-%d')})
최저값: {data.min():.2f} (날짜: {data.idxmin().strftime('%Y-%m-%d')})
"""
return context
# 활용 예시
economic_rag = FREDEconomicRAG(api_key="your_fred_api_key")
context = economic_rag.create_economic_context('FEDFUNDS')
활용 시나리오:
- 📊 신용 정책: “현재 금리 환경에서 대출 포트폴리오 조정 방향은?”
- 📉 경기 예측: “실업률과 GDP 성장률 기반 경기 사이클 분석”
- 💹 자산 배분: “인플레이션 상승기 최적 자산 배분 전략은?”
2. 보험
2-1. NAIC InsData
개요: 미국 보험규제협회(NAIC)가 제공하는 보험사별 시장 데이터
데이터 포함 항목:
- 시장 점유율 및 프리미엄 수입
- 손해율 (Loss Ratio) 및 사업비율
- 민원 건수 및 유형별 분석
- 재무 건전성 지표
RAG 구현 예시:
class InsuranceRAGSystem:
def __init__(self):
self.complaint_categories = {
'claim_handling': '보험금 지급 관련',
'policy_service': '보험계약 서비스',
'premium_billing': '보험료 청구',
'underwriting': '언더라이팅',
'sales_marketing': '판매 및 마케팅'
}
def create_insurer_profile(self, insurer_data):
"""보험사 프로필 생성"""
profile = f"""
보험사명: {insurer_data['company_name']}
시장점유율: {insurer_data['market_share']}%
손해율: {insurer_data['loss_ratio']}%
사업비율: {insurer_data['expense_ratio']}%
총 민원 건수: {insurer_data['total_complaints']}건
주요 민원 유형: {insurer_data['top_complaint_type']}
재무등급: {insurer_data['financial_rating']}
"""
return profile
def analyze_complaint_trends(self, company_name, time_period):
"""민원 트렌드 분석"""
# 실제 구현에서는 데이터베이스 쿼리
return {
'trend': 'decreasing',
'primary_issues': ['claim_delays', 'coverage_disputes'],
'resolution_rate': 0.87
}
# 질의 예시
def insurance_chatbot_query(query: str):
"""
예시 질의:
- "State Farm과 Allstate의 손해율 비교는?"
- "자동차 보험 민원이 가장 적은 보험사는?"
- "우리 보험사의 민원 해결률 개선 방안은?"
"""
pass
실제 활용 사례:
# 보험사 벤치마킹 시스템
class InsuranceBenchmarking:
def compare_competitors(self, my_company, competitors):
comparison_data = []
for competitor in competitors:
data = {
'company': competitor,
'loss_ratio': self.get_loss_ratio(competitor),
'complaint_rate': self.get_complaint_rate(competitor),
'market_share': self.get_market_share(competitor)
}
comparison_data.append(data)
return self.create_benchmark_report(my_company, comparison_data)
2-2. OpenFEMA NFIP Claims
개요: 미국 연방재해관리청(FEMA)의 홍수 보험 클레임 데이터
데이터 특징:
- 1978년부터 현재까지의 모든 홍수 보험 클레임
- 지역별, 재해 유형별 상세 분류
- 보험금 지급액 및 손해 평가 정보
활용 예시:
import geopandas as gpd
from shapely.geometry import Point
class FloodInsuranceRAG:
def __init__(self):
self.claims_data = self.load_nfip_data()
def create_risk_assessment_context(self, zip_code):
"""우편번호별 홍수 위험도 평가"""
historical_claims = self.claims_data[
self.claims_data['zip_code'] == zip_code
]
risk_context = f"""
지역: {zip_code}
과거 10년 클레임 건수: {len(historical_claims)}건
평균 지급액: ${historical_claims['amount_paid'].mean():,.2f}
최대 지급액: ${historical_claims['amount_paid'].max():,.2f}
주요 손해 유형: {historical_claims['damage_type'].mode()[0]}
위험도 등급: {self.calculate_risk_grade(historical_claims)}
"""
return risk_context
3. 회계 / 공시자료
3-1. SEC XBRL 데이터
개요: 미국 증권거래위원회(SEC)의 상장기업 재무제표 표준화 데이터
XBRL 구조 예시:
<xbrl>
<us-gaap:Assets contextRef="FY2024" unitRef="USD">1234567890</us-gaap:Assets>
<us-gaap:Liabilities contextRef="FY2024" unitRef="USD">987654321</us-gaap:Liabilities>
<us-gaap:Revenues contextRef="FY2024Q4" unitRef="USD">456789012</us-gaap:Revenues>
</xbrl>
RAG 구현 예시:
import xml.etree.ElementTree as ET
from langchain.schema import Document
class SECXBRLProcessor:
def __init__(self):
self.namespace = {
'us-gaap': 'http://fasb.org/us-gaap/2024',
'dei': 'http://xbrl.sec.gov/dei/2024'
}
def parse_financial_statement(self, xbrl_file):
"""XBRL 파일에서 재무제표 파싱"""
tree = ET.parse(xbrl_file)
root = tree.getroot()
financial_data = {}
# 주요 재무 항목 추출
key_items = [
'Assets', 'Liabilities', 'StockholdersEquity',
'Revenues', 'NetIncomeLoss', 'EarningsPerShare'
]
for item in key_items:
element = root.find(f'.//us-gaap:{item}', self.namespace)
if element is not None:
financial_data[item] = {
'value': float(element.text),
'context': element.get('contextRef'),
'unit': element.get('unitRef')
}
return financial_data
def create_financial_summary(self, company_data):
"""재무 요약 문서 생성"""
summary = f"""
회사명: {company_data['company_name']}
총자산: ${company_data['Assets']['value']:,.0f}
총부채: ${company_data['Liabilities']['value']:,.0f}
자본총계: ${company_data['StockholdersEquity']['value']:,.0f}
매출액: ${company_data['Revenues']['value']:,.0f}
순이익: ${company_data['NetIncomeLoss']['value']:,.0f}
주당순이익: ${company_data['EarningsPerShare']['value']:.2f}
"""
return Document(page_content=summary)
# 활용 예시
processor = SECXBRLProcessor()
tesla_data = processor.parse_financial_statement('tesla_10k_2024.xml')
summary_doc = processor.create_financial_summary(tesla_data)
고급 질의 처리:
class FinancialAnalysisRAG:
def calculate_financial_ratios(self, financial_data):
"""재무비율 계산"""
ratios = {}
# 유동비율
if 'CurrentAssets' in financial_data and 'CurrentLiabilities' in financial_data:
ratios['current_ratio'] = (
financial_data['CurrentAssets']['value'] /
financial_data['CurrentLiabilities']['value']
)
# 부채비율
if 'Liabilities' in financial_data and 'StockholdersEquity' in financial_data:
ratios['debt_ratio'] = (
financial_data['Liabilities']['value'] /
financial_data['StockholdersEquity']['value']
)
# ROE (자기자본이익률)
if 'NetIncomeLoss' in financial_data and 'StockholdersEquity' in financial_data:
ratios['roe'] = (
financial_data['NetIncomeLoss']['value'] /
financial_data['StockholdersEquity']['value'] * 100
)
return ratios
3-2. EDGAR 10-K 위험요소 분석
RAG 활용 예시:
import requests
from bs4 import BeautifulSoup
class EDGARRiskFactorRAG:
def extract_risk_factors(self, filing_url):
"""10-K 서류에서 위험요소 추출"""
response = requests.get(filing_url)
soup = BeautifulSoup(response.content, 'html.parser')
# Item 1A. Risk Factors 섹션 찾기
risk_section = soup.find(text=lambda text: text and 'risk factors' in text.lower())
if risk_section:
risk_content = self.extract_section_content(risk_section)
return self.chunk_risk_factors(risk_content)
return []
def chunk_risk_factors(self, risk_content):
"""위험요소를 의미 단위로 청킹"""
risk_chunks = []
# 위험요소별로 분할 (보통 번호나 항목으로 구분됨)
risk_items = risk_content.split('\n\n')
for item in risk_items:
if len(item.strip()) > 100: # 의미있는 길이의 위험요소만
risk_chunks.append({
'content': item.strip(),
'category': self.categorize_risk(item),
'severity': self.assess_risk_severity(item)
})
return risk_chunks
def categorize_risk(self, risk_text):
"""위험요소 카테고리 분류"""
categories = {
'market': ['market', 'competition', 'demand'],
'operational': ['operation', 'supply chain', 'manufacturing'],
'financial': ['credit', 'liquidity', 'debt'],
'regulatory': ['regulation', 'compliance', 'legal'],
'technology': ['technology', 'cyber', 'data']
}
for category, keywords in categories.items():
if any(keyword in risk_text.lower() for keyword in keywords):
return category
return 'other'
4. 법률
4-1. CourtListener
개요: 400만+ 미국 판례를 포함한 법률 데이터베이스
API 활용 예시:
import requests
from datetime import datetime
class CourtListenerRAG:
def __init__(self, api_token):
self.api_token = api_token
self.base_url = "https://www.courtlistener.com/api/rest/v3/"
self.headers = {'Authorization': f'Token {api_token}'}
def search_cases(self, query, court=None, date_range=None):
"""판례 검색"""
params = {
'q': query,
'order_by': 'score desc',
'format': 'json'
}
if court:
params['court'] = court
if date_range:
params['filed_after'] = date_range['start']
params['filed_before'] = date_range['end']
response = requests.get(
f"{self.base_url}search/",
params=params,
headers=self.headers
)
return response.json()
def create_case_summary(self, case_data):
"""판례 요약 생성"""
summary = f"""
사건명: {case_data['caseName']}
법원: {case_data['court']}
판결일: {case_data['dateFiled']}
재판부: {case_data['panel']}
주요 쟁점: {case_data.get('summary', 'N/A')}
판결 결과: {case_data.get('disposition', 'N/A')}
인용 횟수: {case_data.get('citeCount', 0)}
"""
return summary
def find_similar_cases(self, case_facts, jurisdiction=None):
"""유사 판례 검색"""
# 사실관계 기반 키워드 추출
keywords = self.extract_legal_keywords(case_facts)
search_query = ' AND '.join(keywords)
similar_cases = self.search_cases(
query=search_query,
court=jurisdiction
)
return similar_cases['results'][:10] # 상위 10개 유사 판례
법률 질의응답 시스템:
class LegalQASystem:
def __init__(self):
self.court_listener = CourtListenerRAG(api_token="your_token")
self.legal_categories = {
'contract': '계약법',
'tort': '불법행위법',
'property': '재산법',
'criminal': '형법',
'constitutional': '헌법'
}
def answer_legal_question(self, question, context=None):
"""법률 질문 답변"""
# 질문에서 법률 영역 식별
legal_area = self.identify_legal_area(question)
# 관련 판례 검색
relevant_cases = self.court_listener.search_cases(
query=question,
court=context.get('jurisdiction') if context else None
)
# 답변 구성
answer = self.compose_legal_answer(question, relevant_cases, legal_area)
return answer
def compose_legal_answer(self, question, cases, legal_area):
"""법률 답변 구성"""
answer = f"[{self.legal_categories.get(legal_area, '일반')}] 관련 답변:\n\n"
if cases['results']:
answer += "관련 판례:\n"
for i, case in enumerate(cases['results'][:3], 1):
answer += f"{i}. {case['caseName']} ({case['dateFiled']})\n"
answer += f" 요약: {case.get('summary', '요약 없음')}\n\n"
answer += "참고: 구체적인 법률 자문은 변호사와 상담하시기 바랍니다."
return answer
4-2. Caselaw Access Project
하버드 법대의 360년 판례 데이터 활용:
class CaselawHistoricalRAG:
def analyze_legal_evolution(self, legal_concept, time_periods):
"""법리 변화 추적"""
evolution_data = {}
for period in time_periods:
cases = self.search_cases_by_period(legal_concept, period)
evolution_data[period] = {
'case_count': len(cases),
'major_decisions': self.identify_landmark_cases(cases),
'legal_trends': self.extract_legal_trends(cases)
}
return self.create_evolution_narrative(evolution_data)
def create_evolution_narrative(self, evolution_data):
"""법리 발전 서사 생성"""
narrative = "법리 발전 과정:\n\n"
for period, data in evolution_data.items():
narrative += f"📅 {period}:\n"
narrative += f"- 관련 판례 수: {data['case_count']}건\n"
if data['major_decisions']:
narrative += f"- 주요 판결: {', '.join(data['major_decisions'])}\n"
narrative += f"- 주요 동향: {data['legal_trends']}\n\n"
return narrative
5. 의료
5-1. MIMIC-IV v3.1
개요: MIT의 중환자실 데이터셋 (30만+ 환자, 비식별화 완료)
데이터 구조:
class MIMICDataProcessor:
def __init__(self):
self.tables = {
'admissions': '입원 정보',
'patients': '환자 기본정보',
'icustays': 'ICU 재원 정보',
'chartevents': '차트 기록',
'labevents': '검사 결과',
'prescriptions': '처방 정보',
'noteevents': '임상 노트'
}
def create_patient_timeline(self, patient_id):
"""환자별 치료 과정 타임라인 생성"""
timeline = []
# 입원 정보
admission = self.get_admission_data(patient_id)
timeline.append({
'timestamp': admission['admittime'],
'event': f"입원 - {admission['admission_type']}",
'detail': f"진단: {admission['diagnosis']}"
})
# 검사 결과
lab_events = self.get_lab_events(patient_id)
for lab in lab_events:
timeline.append({
'timestamp': lab['charttime'],
'event': f"검사 - {lab['itemid']}",
'detail': f"결과: {lab['value']} {lab['valueuom']}"
})
# 처방 정보
prescriptions = self.get_prescriptions(patient_id)
for rx in prescriptions:
timeline.append({
'timestamp': rx['startdate'],
'event': f"처방 시작 - {rx['drug']}",
'detail': f"용량: {rx['dose_val_rx']} {rx['dose_unit_rx']}"
})
return sorted(timeline, key=lambda x: x['timestamp'])
의료 질의응답 시스템:
class MedicalQASystem:
def __init__(self):
self.mimic_processor = MIMICDataProcessor()
self.medical_concepts = self.load_medical_ontology()
def answer_clinical_question(self, question, patient_context=None):
"""임상 질문 답변"""
question_type = self.classify_medical_question(question)
if question_type == 'diagnostic':
return self.handle_diagnostic_query(question, patient_context)
elif question_type == 'treatment':
return self.handle_treatment_query(question, patient_context)
elif question_type == 'prognosis':
return self.handle_prognosis_query(question, patient_context)
else:
return self.handle_general_query(question)
def handle_diagnostic_query(self, question, patient_context):
"""진단 관련 질의 처리"""
# 유사한 증상을 가진 환자들의 진단 과정 검색
similar_cases = self.find_similar_cases(patient_context)
diagnostic_patterns = []
for case in similar_cases:
pattern = {
'symptoms': case['presenting_symptoms'],
'lab_results': case['key_lab_values'],
'final_diagnosis': case['diagnosis'],
'diagnostic_path': case['diagnostic_steps']
}
diagnostic_patterns.append(pattern)
return self.generate_diagnostic_insight(diagnostic_patterns)
약물 상호작용 분석:
class DrugInteractionAnalyzer:
def __init__(self):
self.drug_database = self.load_drug_interactions()
def analyze_prescription_safety(self, prescription_list):
"""처방 안전성 분석"""
interactions = []
for i, drug1 in enumerate(prescription_list):
for drug2 in prescription_list[i+1:]:
interaction = self.check_drug_interaction(drug1, drug2)
if interaction:
interactions.append({
'drug1': drug1['name'],
'drug2': drug2['name'],
'severity': interaction['severity'],
'description': interaction['description'],
'recommendation': interaction['recommendation']
})
return interactions
def generate_safety_report(self, interactions):
"""안전성 보고서 생성"""
if not interactions:
return "✅ 확인된 약물 상호작용이 없습니다."
report = "⚠️ 약물 상호작용 경고:\n\n"
for interaction in interactions:
severity_emoji = {
'major': '🔴',
'moderate': '🟡',
'minor': '🟢'
}
report += f"{severity_emoji[interaction['severity']]} "
report += f"{interaction['drug1']} ↔ {interaction['drug2']}\n"
report += f"설명: {interaction['description']}\n"
report += f"권장사항: {interaction['recommendation']}\n\n"
return report
5-2. PubMed Central OA
의학 논문 기반 증거 검색:
import xml.etree.ElementTree as ET
from Bio import Entrez
class PubMedRAGSystem:
def __init__(self, email):
Entrez.email = email
self.database = "pmc"
def search_evidence(self, medical_question, max_results=10):
"""의학적 질문에 대한 논문 증거 검색"""
# PubMed 검색 쿼리 생성
search_terms = self.extract_medical_terms(medical_question)
query = ' AND '.join(search_terms)
# PMC 검색 실행
handle = Entrez.esearch(
db=self.database,
term=query,
retmax=max_results,
sort="relevance"
)
search_results = Entrez.read(handle)
# 논문 상세 정보 추출
evidence_papers = []
for paper_id in search_results['IdList']:
paper_details = self.get_paper_details(paper_id)
evidence_papers.append(paper_details)
return evidence_papers
def get_paper_details(self, paper_id):
"""논문 상세 정보 추출"""
handle = Entrez.efetch(
db=self.database,
id=paper_id,
rettype="xml"
)
paper_xml = handle.read()
root = ET.fromstring(paper_xml)
# 논문 메타데이터 추출
title = root.find('.//article-title').text if root.find('.//article-title') is not None else "제목 없음"
authors = [author.text for author in root.findall('.//contrib[@contrib-type="author"]//name/surname')]
abstract = root.find('.//abstract/p').text if root.find('.//abstract/p') is not None else "초록 없음"
return {
'pmcid': paper_id,
'title': title,
'authors': authors,
'abstract': abstract,
'url': f"https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{paper_id}/"
}
def generate_evidence_summary(self, question, evidence_papers):
"""증거 기반 요약 생성"""
summary = f"질문: {question}\n\n"
summary += "📚 관련 연구 증거:\n\n"
for i, paper in enumerate(evidence_papers, 1):
summary += f"{i}. {paper['title']}\n"
summary += f" 저자: {', '.join(paper['authors'][:3])}{'...' if len(paper['authors']) > 3 else ''}\n"
summary += f" 요약: {paper['abstract'][:200]}...\n"
summary += f" 링크: {paper['url']}\n\n"
summary += "⚠️ 주의: 이 정보는 참고용이며, 실제 진료 결정은 담당 의사와 상의하시기 바랍니다."
return summary
6. 자동차
6-1. NHTSA API
개요: 미국 도로교통안전청의 자동차 안전 데이터
API 활용 예시:
import requests
import pandas as pd
class NHTSAVehicleSafetyRAG:
def __init__(self):
self.base_url = "https://vpic.nhtsa.dot.gov/api/vehicles"
self.recall_url = "https://api.nhtsa.gov/recalls/recallsByVehicle"
def get_vehicle_info(self, vin):
"""VIN 기반 차량 정보 조회"""
url = f"{self.base_url}/DecodeVINValues/{vin}"
params = {'format': 'json'}
response = requests.get(url, params=params)
vehicle_data = response.json()
if vehicle_data['Results']:
return vehicle_data['Results'][0]
return None
def get_recall_info(self, make, model, year):
"""제조사/모델/연식별 리콜 정보 조회"""
params = {
'make': make,
'model': model,
'modelYear': year,
'format': 'json'
}
response = requests.get(self.recall_url, params=params)
recall_data = response.json()
return recall_data.get('results', [])
def create_safety_profile(self, vehicle_info, recalls):
"""차량 안전성 프로필 생성"""
safety_profile = f"""
=== 차량 안전성 정보 ===
제조사: {vehicle_info.get('Make', 'N/A')}
모델: {vehicle_info.get('Model', 'N/A')}
연식: {vehicle_info.get('ModelYear', 'N/A')}
🚨 리콜 정보:
총 리콜 건수: {len(recalls)}건
"""
if recalls:
safety_profile += "\n주요 리콜 사항:\n"
for i, recall in enumerate(recalls[:5], 1):
safety_profile += f"{i}. {recall.get('Component', 'N/A')}\n"
safety_profile += f" 위험도: {recall.get('PotentialUnitsAffected', 'N/A')}대 영향\n"
safety_profile += f" 조치: {recall.get('Remedy', 'N/A')}\n\n"
else:
safety_profile += "\n✅ 현재 활성 리콜이 없습니다.\n"
return safety_profile
사고 데이터 분석 (FARS):
class FARSAccidentAnalyzer:
def __init__(self):
self.accident_factors = {
'weather': ['clear', 'rain', 'snow', 'fog'],
'road_surface': ['dry', 'wet', 'icy', 'snowy'],
'light_condition': ['daylight', 'dark', 'dawn', 'dusk'],
'alcohol_involvement': ['yes', 'no', 'unknown']
}
def analyze_accident_patterns(self, vehicle_make_model):
"""특정 차량의 사고 패턴 분석"""
accident_data = self.get_accident_data(vehicle_make_model)
analysis = {
'total_accidents': len(accident_data),
'fatality_rate': self.calculate_fatality_rate(accident_data),
'common_factors': self.identify_common_factors(accident_data),
'seasonal_trends': self.analyze_seasonal_trends(accident_data)
}
return self.generate_safety_insights(analysis)
def generate_safety_insights(self, analysis):
"""안전성 인사이트 생성"""
insights = f"""
🚗 차량 안전성 분석 결과:
📊 기본 통계:
- 총 사고 건수: {analysis['total_accidents']:,}건
- 치명률: {analysis['fatality_rate']:.2%}
⚠️ 주요 위험 요인:
"""
for factor, frequency in analysis['common_factors'].items():
insights += f"- {factor}: {frequency:.1%}\n"
insights += f"\n📅 계절별 동향:\n"
for season, data in analysis['seasonal_trends'].items():
insights += f"- {season}: {data['accident_count']}건 ({data['severity_score']:.1f}점)\n"
return insights
7. 증권
7-1. NASDAQ 주식 데이터
포괄적 주식 데이터 RAG 시스템:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class StockMarketRAG:
def __init__(self):
self.technical_indicators = {
'sma_20': lambda data: data['Close'].rolling(window=20).mean(),
'sma_50': lambda data: data['Close'].rolling(window=50).mean(),
'rsi': self.calculate_rsi,
'macd': self.calculate_macd
}
def get_stock_data(self, symbol, period="1y"):
"""주식 데이터 조회"""
stock = yf.Ticker(symbol)
hist_data = stock.history(period=period)
info = stock.info
return {
'price_data': hist_data,
'company_info': info,
'technical_analysis': self.calculate_technical_indicators(hist_data)
}
def calculate_technical_indicators(self, price_data):
"""기술적 지표 계산"""
indicators = {}
for name, calc_func in self.technical_indicators.items():
try:
indicators[name] = calc_func(price_data)
except Exception as e:
indicators[name] = None
return indicators
def create_stock_analysis_context(self, symbol):
"""주식 분석 컨텍스트 생성"""
stock_data = self.get_stock_data(symbol)
latest_price = stock_data['price_data']['Close'].iloc[-1]
prev_close = stock_data['price_data']['Close'].iloc[-2]
change_pct = ((latest_price - prev_close) / prev_close) * 100
context = f"""
=== {symbol} 주식 분석 ===
현재가: ${latest_price:.2f}
전일 대비: {change_pct:+.2f}%
52주 최고가: ${stock_data['price_data']['High'].max():.2f}
52주 최저가: ${stock_data['price_data']['Low'].min():.2f}
기술적 지표:
- 20일 이평선: ${stock_data['technical_analysis']['sma_20'].iloc[-1]:.2f}
- 50일 이평선: ${stock_data['technical_analysis']['sma_50'].iloc[-1]:.2f}
- RSI: {stock_data['technical_analysis']['rsi'].iloc[-1]:.1f}
회사 정보:
- 시가총액: ${stock_data['company_info'].get('marketCap', 0):,}
- P/E 비율: {stock_data['company_info'].get('trailingPE', 'N/A')}
- 배당수익률: {stock_data['company_info'].get('dividendYield', 0)*100:.2f}%
"""
return context
포트폴리오 분석 시스템:
class PortfolioAnalysisRAG:
def __init__(self):
self.risk_free_rate = 0.03 # 3% 무위험 수익률 가정
def analyze_portfolio(self, portfolio_symbols, weights=None):
"""포트폴리오 분석"""
if weights is None:
weights = [1/len(portfolio_symbols)] * len(portfolio_symbols)
portfolio_data = {}
for symbol in portfolio_symbols:
stock_data = yf.Ticker(symbol).history(period="2y")
portfolio_data[symbol] = stock_data['Close'].pct_change().dropna()
portfolio_df = pd.DataFrame(portfolio_data)
# 포트폴리오 수익률 계산
portfolio_returns = (portfolio_df * weights).sum(axis=1)
# 위험 지표 계산
portfolio_metrics = {
'annual_return': portfolio_returns.mean() * 252,
'annual_volatility': portfolio_returns.std() * np.sqrt(252),
'sharpe_ratio': self.calculate_sharpe_ratio(portfolio_returns),
'max_drawdown': self.calculate_max_drawdown(portfolio_returns),
'var_95': np.percentile(portfolio_returns, 5),
'correlation_matrix': portfolio_df.corr()
}
return self.generate_portfolio_report(portfolio_metrics, portfolio_symbols, weights)
def calculate_sharpe_ratio(self, returns):
"""샤프 비율 계산"""
excess_returns = returns.mean() - (self.risk_free_rate / 252)
return (excess_returns * 252) / (returns.std() * np.sqrt(252))
def calculate_max_drawdown(self, returns):
"""최대 낙폭 계산"""
cumulative_returns = (1 + returns).cumprod()
rolling_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - rolling_max) / rolling_max
return drawdown.min()
def generate_portfolio_report(self, metrics, symbols, weights):
"""포트폴리오 리포트 생성"""
report = f"""
📊 포트폴리오 분석 리포트
구성 종목: {', '.join(symbols)}
비중: {', '.join([f'{w:.1%}' for w in weights])}
📈 수익성 지표:
- 연간 수익률: {metrics['annual_return']:.2%}
- 샤프 비율: {metrics['sharpe_ratio']:.2f}
⚠️ 위험 지표:
- 연간 변동성: {metrics['annual_volatility']:.2%}
- 최대 낙폭: {metrics['max_drawdown']:.2%}
- VaR (95%): {metrics['var_95']:.2%}
📊 상관관계 분석:
"""
# 상관관계 매트릭스를 텍스트로 변환
corr_matrix = metrics['correlation_matrix']
for i in range(len(symbols)):
for j in range(i+1, len(symbols)):
corr_value = corr_matrix.iloc[i, j]
report += f"- {symbols[i]} ↔ {symbols[j]}: {corr_value:.3f}\n"
return report
RAG vs 파인튜닝 전략 가이드
산업별 최적 접근법
class IndustryRAGStrategy:
def __init__(self):
self.industry_configs = {
'banking': {
'chunk_size': 512,
'overlap': 50,
'embedding_model': 'text-embedding-ada-002',
'vector_store': 'pinecone',
'rerank_model': 'cross-encoder/ms-marco-MiniLM-L-6-v2'
},
'legal': {
'chunk_size': 1024, # 법률 문서는 더 긴 컨텍스트 필요
'overlap': 100,
'embedding_model': 'sentence-transformers/all-MiniLM-L6-v2',
'vector_store': 'weaviate',
'rerank_model': 'cross-encoder/ms-marco-MiniLM-L-12-v2'
},
'medical': {
'chunk_size': 256, # 의료 데이터는 정확성을 위해 작은 청크
'overlap': 25,
'embedding_model': 'sentence-transformers/all-mpnet-base-v2',
'vector_store': 'qdrant',
'rerank_model': 'ms-marco-MiniLM-L-6-v2'
}
}
def get_optimal_config(self, industry, data_characteristics):
"""산업별 최적 설정 제안"""
base_config = self.industry_configs.get(industry, self.industry_configs['banking'])
# 데이터 특성에 따른 조정
if data_characteristics.get('avg_document_length', 0) > 2000:
base_config['chunk_size'] *= 1.5
base_config['overlap'] *= 1.5
if data_characteristics.get('technical_terminology', False):
base_config['embedding_model'] = 'sentence-transformers/all-mpnet-base-v2'
return base_config
하이브리드 접근법 구현
class HybridRAGFineTuning:
def __init__(self, base_model="gpt-3.5-turbo"):
self.base_model = base_model
self.rag_system = None
self.fine_tuned_model = None
def stage1_rag_deployment(self, datasets):
"""1단계: RAG 시스템 배포"""
print("🔄 1단계: RAG 시스템 구축 중...")
# 각 산업별 벡터 스토어 구축
for industry, dataset in datasets.items():
vector_store = self.create_vector_store(industry, dataset)
self.rag_system = self.setup_rag_pipeline(vector_store)
print("✅ RAG 시스템 배포 완료")
def stage2_collect_qa_logs(self, monitoring_period_days=30):
"""2단계: 사용자 Q&A 로그 수집"""
print(f"📊 2단계: {monitoring_period_days}일간 Q&A 로그 수집...")
# 실제 구현에서는 로깅 시스템에서 데이터 수집
qa_logs = self.collect_user_interactions()
quality_scores = self.evaluate_rag_responses(qa_logs)
# 품질이 낮은 질의들 식별
improvement_candidates = [
qa for qa, score in zip(qa_logs, quality_scores)
if score < 0.7
]
return improvement_candidates
def stage3_fine_tuning(self, improvement_data):
"""3단계: 선별된 데이터로 파인튜닝"""
print("🔧 3단계: 모델 파인튜닝 시작...")
# 파인튜닝 데이터 준비
training_data = self.prepare_fine_tuning_data(improvement_data)
# OpenAI Fine-tuning API 사용 예시
fine_tuning_job = self.create_fine_tuning_job(training_data)
print(f"✅ 파인튜닝 완료. 모델 ID: {fine_tuning_job.fine_tuned_model}")
return fine_tuning_job.fine_tuned_model
def prepare_fine_tuning_data(self, qa_data):
"""파인튜닝 데이터 준비"""
training_examples = []
for qa in qa_data:
# RAG에서 검색된 컨텍스트와 개선된 답변을 조합
training_example = {
"messages": [
{"role": "system", "content": "당신은 전문적인 도메인 지식을 가진 AI 어시스턴트입니다."},
{"role": "user", "content": qa['question']},
{"role": "assistant", "content": qa['improved_answer']}
]
}
training_examples.append(training_example)
return training_examples
보안 및 컴플라이언스 고려사항
데이터 보안 프레임워크
class SecureRAGFramework:
def __init__(self):
self.access_control = {}
self.audit_logger = AuditLogger()
self.data_classifier = DataClassifier()
def setup_row_level_security(self, user_role, organization):
"""행 수준 보안 설정"""
access_rules = {
'banking_analyst': {
'allowed_tables': ['public_filings', 'market_data'],
'restricted_fields': ['ssn', 'account_numbers'],
'data_masking': True
},
'legal_counsel': {
'allowed_tables': ['public_cases', 'regulations'],
'restricted_fields': ['client_names', 'case_details'],
'data_masking': True
},
'medical_researcher': {
'allowed_tables': ['anonymized_records', 'public_studies'],
'restricted_fields': ['patient_ids', 'personal_info'],
'data_masking': True
}
}
return access_rules.get(user_role, {})
def implement_data_governance(self, dataset_info):
"""데이터 거버넌스 구현"""
governance_policy = {
'data_classification': self.classify_data_sensitivity(dataset_info),
'retention_period': self.determine_retention_period(dataset_info),
'access_logging': True,
'encryption_at_rest': True,
'encryption_in_transit': True
}
return governance_policy
def audit_rag_queries(self, user_id, query, results):
"""RAG 쿼리 감사 로깅"""
audit_record = {
'timestamp': datetime.now().isoformat(),
'user_id': user_id,
'query': query,
'results_count': len(results),
'data_sources': self.extract_data_sources(results),
'sensitivity_level': self.assess_query_sensitivity(query)
}
self.audit_logger.log(audit_record)
GDPR/개인정보보호 준수
class PrivacyCompliantRAG:
def __init__(self):
self.pii_detector = PIIDetector()
self.anonymizer = DataAnonymizer()
def ensure_gdpr_compliance(self, raw_data):
"""GDPR 준수 데이터 처리"""
# 1. PII 식별
pii_detected = self.pii_detector.scan(raw_data)
# 2. 자동 익명화
if pii_detected:
anonymized_data = self.anonymizer.process(raw_data, pii_detected)
return {
'processed_data': anonymized_data,
'pii_removed': len(pii_detected),
'compliance_status': 'GDPR_COMPLIANT'
}
return {
'processed_data': raw_data,
'pii_removed': 0,
'compliance_status': 'NO_PII_DETECTED'
}
def implement_right_to_erasure(self, user_request):
"""삭제권 구현"""
affected_records = self.find_user_data(user_request['user_identifier'])
for record in affected_records:
self.mark_for_deletion(record)
self.update_vector_embeddings(record['embedding_id'])
return {
'deleted_records': len(affected_records),
'status': 'DELETION_COMPLETED'
}
성능 최적화 및 모니터링
RAG 성능 메트릭
class RAGPerformanceMonitor:
def __init__(self):
self.metrics_collector = MetricsCollector()
self.alerting_system = AlertingSystem()
def measure_rag_performance(self, query, retrieved_docs, final_answer):
"""RAG 성능 측정"""
metrics = {
# 검색 품질
'retrieval_precision': self.calculate_retrieval_precision(retrieved_docs),
'retrieval_recall': self.calculate_retrieval_recall(retrieved_docs),
'context_relevance': self.measure_context_relevance(query, retrieved_docs),
# 생성 품질
'answer_relevance': self.measure_answer_relevance(query, final_answer),
'answer_faithfulness': self.measure_faithfulness(retrieved_docs, final_answer),
'answer_completeness': self.measure_completeness(query, final_answer),
# 성능 지표
'response_time': self.measure_response_time(),
'token_usage': self.count_tokens(final_answer),
'cost_per_query': self.calculate_cost()
}
return metrics
def setup_continuous_monitoring(self):
"""지속적 모니터링 설정"""
monitoring_config = {
'response_time_threshold': 3.0, # 3초
'relevance_score_threshold': 0.7,
'cost_per_query_threshold': 0.05, # $0.05
'alert_channels': ['slack', 'email'],
'monitoring_interval': 300 # 5분
}
return monitoring_config
자동 성능 튜닝
class AutoRAGOptimizer:
def __init__(self):
self.hyperparameter_tuner = HyperparameterTuner()
self.a_b_tester = ABTester()
def optimize_chunk_size(self, dataset, test_queries):
"""청크 크기 최적화"""
chunk_sizes = [256, 512, 1024, 2048]
results = {}
for chunk_size in chunk_sizes:
# 각 청크 크기로 RAG 시스템 구성
rag_system = self.create_rag_system(dataset, chunk_size)
# 테스트 쿼리로 성능 평가
performance = self.evaluate_performance(rag_system, test_queries)
results[chunk_size] = performance
# 최적 청크 크기 선택
optimal_size = max(results.keys(), key=lambda k: results[k]['f1_score'])
return optimal_size, results
def optimize_retrieval_parameters(self, rag_system, validation_set):
"""검색 파라미터 최적화"""
parameter_grid = {
'top_k': [3, 5, 10, 15],
'similarity_threshold': [0.6, 0.7, 0.8, 0.9],
'rerank_top_n': [3, 5, 10]
}
best_params = self.hyperparameter_tuner.grid_search(
rag_system,
parameter_grid,
validation_set
)
return best_params
실제 구현 예시
통합 산업별 RAG 플랫폼
class MultiIndustryRAGPlatform:
def __init__(self):
self.industry_processors = {
'banking': BankingDataProcessor(),
'insurance': InsuranceDataProcessor(),
'legal': LegalDataProcessor(),
'medical': MedicalDataProcessor(),
'automotive': AutomotiveDataProcessor(),
'securities': SecuritiesDataProcessor()
}
self.rag_systems = {}
self.user_management = UserManagementSystem()
def initialize_industry_rag(self, industry, datasets):
"""산업별 RAG 시스템 초기화"""
processor = self.industry_processors[industry]
# 1. 데이터 전처리
processed_data = processor.process_datasets(datasets)
# 2. 벡터 스토어 구성
vector_store = self.create_vector_store(industry, processed_data)
# 3. RAG 파이프라인 설정
rag_pipeline = self.setup_rag_pipeline(industry, vector_store)
self.rag_systems[industry] = rag_pipeline
return rag_pipeline
def query_multi_industry(self, user_id, query, industries=None):
"""다중 산업 질의"""
user_permissions = self.user_management.get_permissions(user_id)
allowed_industries = industries or user_permissions['industries']
results = {}
for industry in allowed_industries:
if industry in self.rag_systems:
industry_result = self.rag_systems[industry].query(query)
results[industry] = industry_result
# 결과 통합 및 랭킹
integrated_response = self.integrate_multi_industry_results(results)
return integrated_response
def integrate_multi_industry_results(self, industry_results):
"""다중 산업 결과 통합"""
all_sources = []
confidence_scores = []
for industry, result in industry_results.items():
for source in result['sources']:
source['industry'] = industry
all_sources.append(source)
confidence_scores.append(result['confidence'])
# 신뢰도 기반 정렬
sorted_sources = [
source for _, source in sorted(
zip(confidence_scores, all_sources),
reverse=True
)
]
return {
'integrated_answer': self.generate_integrated_answer(sorted_sources),
'sources_by_industry': industry_results,
'confidence_score': max(confidence_scores) if confidence_scores else 0
}
결론
이 가이드에서 제시한 7개 산업 분야의 검증된 공개 데이터셋은 기업용 RAG 시스템 구축의 출발점이 될 수 있습니다.
🎯 핵심 성공 요소
- 점진적 구축: RAG → 모니터링 → 파인튜닝 순서로 단계별 발전
- 산업별 최적화: 각 도메인의 특성에 맞는 청킹 및 임베딩 전략
- 보안 우선: 처음부터 데이터 거버넌스와 접근 제어 고려
- 지속적 개선: 성능 모니터링과 자동 최적화 체계 구축
🚀 다음 단계
- PoC 단계: 1-2개 산업으로 시작하여 핵심 기능 검증
- 확장 단계: 성공 사례를 바탕으로 다른 산업 영역 추가
- 고도화 단계: 멀티모달 데이터와 실시간 업데이트 지원
각 산업별 상세한 구현 가이드나 특정 데이터셋 활용 방법에 대한 추가 문의가 있으시면 언제든 말씀해 주세요! 🤝