헤지펀드 수준의 시계열 모델링: 수천 개 모델 학습 인프라 구축 완전 가이드
⏱️ 예상 읽기 시간: 18분
서론
헤지펀드들은 어떻게 하루에 수천 개의 시계열 모델을 학습시킬까요? Citadel, Renaissance Technologies, Two Sigma 같은 탑티어 헤지펀드들이 사용하는 시계열 모델링 전략과 인프라는 일반적인 머신러닝 프로젝트와는 완전히 다른 접근 방식을 취합니다.
본 가이드는 실제 헤지펀드들이 운영하는 시계열 모델 유형부터 대규모 GPU 클러스터 기반 학습 인프라 설계까지, 실전 트레이딩 시스템 구축의 모든 과정을 다룹니다. DeepSeek을 운영하는 High-Flyer Capital의 GPU 1만대 규모 인프라 사례도 함께 살펴보겠습니다.
핵심 학습 목표
- 헤지펀드들이 실제 사용하는 6가지 시계열 모델 유형 이해
- 성능이 검증된 최신 시계열 모델 벤치마크 분석
- Kubernetes + Ray 기반 수천 개 모델 학습 인프라 설계
- RTX 4090 vs H100 vs AMD MI300X 비용 효율성 분석
- 실제 운영 가능한 배포 파이프라인 구성
헤지펀드들이 실제 사용하는 시계열 모델 분석
1. GARCH 계열 모델 (리스크/변동성 예측)
헤지펀드에서 가장 핵심적으로 사용되는 모델 계열입니다. 단순해 보이지만 실제로는 매우 정교한 운영이 필요합니다.
주요 사용 사례
- Intraday VaR: 장중 포지션 리스크 실시간 모니터링
- Overnight VaR: 익일 시장 개장 전 리스크 평가
- 옵션 포트폴리오 관리: 델타/감마 헷징 비율 동적 조정
- 동적 헷징: 시장 변동성 변화에 따른 헷징 전략 자동 조정
왜 수천 개의 GARCH 모델이 필요한가?
자산 종류 (500개) × 기간 (10개) × 변동성 유형 (5개) = 25,000개 모델
예시:
- 자산: S&P500 개별 종목, 섹터 ETF, 통화, 원자재, 채권
- 기간: 1분, 5분, 15분, 1시간, 1일, 1주, 1개월
- 변동성 유형: 실현변동성, 내재변동성, 조건부변동성 등
성능 우수 GARCH 모델
모델 유형 | 특징 | 적용 사례 | 연산 요구사항 |
---|---|---|---|
EGARCH | 비대칭 효과 모델링 | 주식 시장 변동성 예측 | CPU 0.1초 학습 |
TGARCH | 임계값 기반 변동성 | 고빈도 거래 리스크 관리 | CPU 0.2초 학습 |
DCC-GARCH | 동적 조건부 상관관계 | 포트폴리오 최적화 | CPU 1-5초 학습 |
GJR-GARCH | 레버리지 효과 반영 | 옵션 포트폴리오 헷징 | CPU 0.3초 학습 |
2. 선형/정규화 회귀 기반 알파 모델
헤지펀드의 수익 창출 핵심인 알파 발굴 모델입니다. 단순함 속에 정교함이 숨어있습니다.
실제 운영 규모
- Renaissance Technologies: 추정 10만+ 알파 팩터 모델 운영
- Citadel: 섹터별/지역별 수만 개 회귀 모델 동시 운영
주요 모델 유형
# ElasticNet 기반 알파 팩터 모델 예시
from sklearn.linear_model import ElasticNet
import numpy as np
class AlphaFactorModel:
def __init__(self, alpha=0.1, l1_ratio=0.5):
self.model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio)
self.lookback_period = 252 # 1년
def generate_features(self, price_data):
"""기술적 지표 기반 피처 생성"""
features = {}
# 모멘텀 팩터
features['momentum_5d'] = price_data.pct_change(5)
features['momentum_20d'] = price_data.pct_change(20)
# 평균회귀 팩터
features['rsi'] = self.calculate_rsi(price_data)
features['bollinger_position'] = self.calculate_bollinger_position(price_data)
# 거래량 팩터
features['volume_ratio'] = self.calculate_volume_ratio(price_data)
return features
운영 복잡성의 이유
시장 (10개) × 신호 유형 (100개) × 예측 기간 (10개) × 리밸런싱 빈도 (5개) = 50,000개 모델
3. 그래디언트 부스팅 트리 모델
테이블형 데이터에서 가장 강력한 성능을 보이는 모델로, 대체 데이터 활용에 필수적입니다.
주요 활용 데이터
- 호가창 데이터: 매수/매도 호가 변화 패턴
- 위성 이미지: 원자재/부동산 수급 예측
- 소셜 미디어: 시장 센티멘트 분석
- 신용카드 거래: 소비 트렌드 예측
- 뉴스 피드: 이벤트 드리븐 거래
성능 우수 모델 비교
모델 | 특징 | 메모리 사용량 | GPU 가속 | 추천 사용 사례 |
---|---|---|---|---|
XGBoost | 범용성 최고 | 중간 | CUDA 지원 | 일반적인 피처 기반 예측 |
LightGBM | 속도 최적화 | 낮음 | GPU 부분 지원 | 대용량 데이터 고속 처리 |
CatBoost | 범주형 데이터 특화 | 높음 | GPU 완전 지원 | 혼합 데이터 타입 처리 |
4. 시계열 신경망 모델
전통적인 시계열 분석의 한계를 뛰어넘는 차세대 모델들입니다.
M4 Competition 우승 모델: Dilated LSTM + Holt-Winters
import torch
import torch.nn as nn
class DilatedLSTMHybrid(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dilation_factors):
super().__init__()
self.lstm_layers = nn.ModuleList()
for i, dilation in enumerate(dilation_factors):
if i == 0:
lstm = DilatedLSTM(input_size, hidden_size, dilation)
else:
lstm = DilatedLSTM(hidden_size, hidden_size, dilation)
self.lstm_layers.append(lstm)
self.output_layer = nn.Linear(hidden_size, 1)
def forward(self, x):
for lstm in self.lstm_layers:
x = lstm(x)
return self.output_layer(x)
class DilatedLSTM(nn.Module):
def __init__(self, input_size, hidden_size, dilation):
super().__init__()
self.dilation = dilation
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
def forward(self, x):
# Dilated convolution 개념을 LSTM에 적용
x_dilated = x[:, ::self.dilation, :]
output, (h_n, c_n) = self.lstm(x_dilated)
return output
최신 고성능 모델들
N-BEATS (Neural Basis Expansion Analysis)
- 특징: 해석 가능한 예측 분해 (트렌드 + 계절성)
- 성능: M3/M4 대회에서 기존 방법 대비 11-20% 개선
- 메모리: ~50,000 파라미터, 2GB VRAM으로 충분
PatchTST (Patched Time Series Transformer)
- 특징: 시계열을 패치로 분할하여 Transformer 적용
- 장점: 메모리 효율성 + 멀티변량 시계열 우수 성능
- 성능: ETTh1 데이터셋에서 기존 Transformer 대비 30% 개선
5. Transformer 기반 파운데이션 모델
Google TimesFM (200M 파라미터)
헤지펀드 업계에서 가장 주목받는 시계열 파운데이션 모델입니다.
# TimesFM 활용 예시 (개념적 구현)
import torch
from transformers import AutoModel, AutoTokenizer
class TimesFMForecaster:
def __init__(self, model_name="google/timesfm-1.0-200m"):
self.model = AutoModel.from_pretrained(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
def forecast(self, historical_data, horizon=30):
"""
Args:
historical_data: 과거 시계열 데이터
horizon: 예측할 미래 기간
"""
# 시계열 데이터를 토큰으로 변환
inputs = self.tokenizer(historical_data, return_tensors="pt")
# 모델을 통한 예측
with torch.no_grad():
outputs = self.model(**inputs)
# 예측 결과 후처리
predictions = self.decode_predictions(outputs, horizon)
return predictions
def zero_shot_forecast(self, new_asset_data, horizon=30):
"""새로운 자산에 대한 제로샷 예측"""
return self.forecast(new_asset_data, horizon)
TimesFM의 헤지펀드 활용 사례
- S&P 100 VaR 예측: 0.01-0.1 분위수에서 GARCH 모델과 동등 이상 성능
- 크로스 자산 예측: 단일 모델로 주식, 채권, 원자재 동시 예측
- 제로샷 예측: 신규 상장 종목 즉시 예측 가능
6. 멀티모달/그래프 기반 모델
Cross-Modal Temporal Fusion (CMTF)
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv
class CrossModalTemporalFusion(nn.Module):
def __init__(self, price_dim, text_dim, graph_dim, hidden_dim):
super().__init__()
# 가격 데이터 인코더
self.price_encoder = nn.LSTM(price_dim, hidden_dim, batch_first=True)
# 텍스트 데이터 인코더 (뉴스, 소셜미디어)
self.text_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(text_dim, nhead=8),
num_layers=6
)
# 그래프 네트워크 인코더 (자산 간 관계)
self.graph_encoder = GCNConv(graph_dim, hidden_dim)
# 융합 레이어
self.fusion_layer = nn.MultiheadAttention(hidden_dim, num_heads=8)
# 예측 헤드
self.predictor = nn.Sequential(
nn.Linear(hidden_dim * 3, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, 1)
)
def forward(self, price_data, text_data, graph_data, edge_index):
# 각 모달리티 인코딩
price_features, _ = self.price_encoder(price_data)
text_features = self.text_encoder(text_data)
graph_features = self.graph_encoder(graph_data, edge_index)
# 멀티모달 융합
fused_features = torch.cat([
price_features[:, -1, :], # 마지막 시점 가격 특징
text_features.mean(dim=1), # 텍스트 특징 평균
graph_features # 그래프 특징
], dim=1)
# 최종 예측
predictions = self.predictor(fused_features)
return predictions
DeepSeek과 헤지펀드의 실제 관계
DeepSeek는 중국의 대형 헤지펀드 High-Flyer Capital의 리서치 랩입니다. 몇 가지 중요한 사실들을 정리하면:
공개된 정보
- GPU 인프라: 1만대 규모의 GPU 클러스터 운영
- 연구 성과: DeepSeek-V2, V3, R1 등 오픈소스 언어모델 개발
- 기술력: 추론 비용 최적화에서 세계 최고 수준
비공개 정보
- 실제 트레이딩 모델: 어떤 모델을 거래에 사용하는지 미공개
- 데이터 소스: 실제 거래에 활용하는 데이터 파이프라인 비공개
- 수익률: 펀드 실제 성과는 기관투자자만 접근 가능
시사점
DeepSeek의 사례는 AI 연구 인프라와 실제 트레이딩 시스템이 별도로 운영될 수 있음을 보여줍니다. 오픈소스 언어모델 연구를 통해 얻은 기술력이 실제 거래 시스템에도 적용될 가능성이 높습니다.
성능 우수 시계열 모델 벤치마크 분석
M-Series Competition 결과 분석
시계열 예측 분야의 올림픽이라 할 수 있는 M-Series 대회 결과를 분석해보겠습니다.
M4 Competition (2018) 우승 전략
Slawek Smyl의 하이브리드 모델
class M4WinningModel:
def __init__(self):
# 1. Dilated LSTM으로 복잡한 패턴 학습
self.lstm_component = DilatedLSTMEnsemble()
# 2. Holt-Winters로 계절성 처리
self.hw_component = HoltWintersEnsemble()
# 3. 스태킹 메타 러너
self.meta_learner = XGBRegressor()
def train(self, train_data):
# 각 컴포넌트 개별 학습
lstm_preds = self.lstm_component.fit_predict(train_data)
hw_preds = self.hw_component.fit_predict(train_data)
# 메타 러너로 최적 조합 학습
meta_features = np.column_stack([lstm_preds, hw_preds])
self.meta_learner.fit(meta_features, train_data.target)
def predict(self, test_data):
lstm_preds = self.lstm_component.predict(test_data)
hw_preds = self.hw_component.predict(test_data)
meta_features = np.column_stack([lstm_preds, hw_preds])
final_prediction = self.meta_learner.predict(meta_features)
return final_prediction
핵심 성공 요인
- 앙상블의 다양성: Neural + Statistical 조합
- 계층적 예측: 메타 러너로 최적 가중치 자동 학습
- 견고성: 단일 모델 실패 시에도 안정적 성능
최신 벤치마크 결과 (2024)
변동성 예측 벤치마크
모델 | S&P 500 VaR (1%) | RMSE | 학습 시간 | 메모리 사용량 |
---|---|---|---|---|
EGARCH | 0.0234 | 0.0156 | 0.1초 | 50MB |
DCC-GARCH | 0.0228 | 0.0152 | 2.3초 | 120MB |
TimesFM | 0.0225 | 0.0149 | 45초 | 2.1GB |
LSTM-GARCH | 0.0231 | 0.0154 | 12초 | 512MB |
가격 예측 벤치마크
모델 | MAPE (%) | SMAPE (%) | 학습 시간 | 추론 시간 |
---|---|---|---|---|
N-BEATS | 12.3 | 8.7 | 5분 | 10ms |
PatchTST | 11.8 | 8.2 | 8분 | 15ms |
TimesFM | 11.5 | 8.0 | 30분 | 25ms |
Prophet | 15.2 | 11.3 | 30초 | 5ms |
현업 활용 모델 조합 전략
계층적 앙상블 구조
class HedgeFundModelEnsemble:
def __init__(self):
# 레벨 1: 기본 모델들
self.classical_models = {
'garch': GARCHModel(),
'arima': ARIMAModel(),
'prophet': ProphetModel()
}
self.ml_models = {
'xgboost': XGBoostModel(),
'lightgbm': LightGBMModel(),
'nbeats': NBeatsModel()
}
self.foundation_models = {
'timesfm': TimesFMModel()
}
# 레벨 2: 메타 러너
self.meta_learner = StackingRegressor([
('classical_stack', VotingRegressor(list(self.classical_models.values()))),
('ml_stack', VotingRegressor(list(self.ml_models.values()))),
('foundation_stack', list(self.foundation_models.values())[0])
])
def get_prediction_confidence(self, predictions):
"""예측 신뢰도 계산"""
pred_std = np.std(predictions, axis=0)
confidence = 1 / (1 + pred_std) # 표준편차 역수로 신뢰도 계산
return confidence
def adaptive_weighting(self, recent_performance):
"""최근 성과 기반 가중치 조정"""
weights = {}
for model_name, performance in recent_performance.items():
# 최근 30일 성과 기반 가중치
weights[model_name] = np.exp(-performance['mse'])
# 정규화
total_weight = sum(weights.values())
for model_name in weights:
weights[model_name] /= total_weight
return weights
대규모 모델 학습 인프라 설계
전체 아키텍처 개요
수천 개의 시계열 모델을 매일 학습시키기 위한 인프라는 전통적인 머신러닝 파이프라인과는 완전히 다른 접근이 필요합니다.
핵심 설계 원칙
- 수평 확장성: 모델 수 증가에 따른 선형적 확장
- 장애 격리: 단일 모델 실패가 전체 시스템에 영향 없음
- 리소스 효율성: GPU/CPU 리소스 최적 활용
- 실시간 모니터링: 수천 개 모델의 상태 실시간 추적
Kubernetes 기반 오케스트레이션
클러스터 구성
# cluster-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: cluster-config
data:
# 노드 유형별 설정
head-node-specs: |
cpu: 2x AMD EPYC 9654 (96코어)
memory: 512GB DDR5
storage: 8TB NVMe SSD
network: 25GbE
worker-node-specs: |
# GPU 워커 노드
gpu-rtx4090:
cpu: 2x Intel Xeon Gold 6348
memory: 384GB DDR4
gpu: 8x RTX 4090 24GB
storage: 2TB NVMe SSD
gpu-h100:
cpu: 2x Intel Xeon Platinum 8480+
memory: 1TB DDR5
gpu: 4x H100 80GB (MIG 가능)
storage: 4TB NVMe SSD
cpu-intensive:
cpu: 4x AMD EPYC 9654
memory: 2TB DDR5
storage: 16TB NVMe SSD
KubeRay Operator 설정
# kuberay-operator.yaml
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
name: hedge-fund-training-job
spec:
entrypoint: python /app/training/parallel_model_training.py
runtimeEnvYAML: |
pip:
- torch==2.1.0
- ray[tune]==2.8.0
- xgboost-ray==0.1.18
- arch==6.2.0 # GARCH 모델링
- pytorch-forecasting==1.0.0
rayClusterSpec:
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-cpus: '0' # 헤드 노드는 스케줄링만
template:
spec:
containers:
- name: ray-head
image: thakicloud/hedge-fund-trainer:latest
resources:
limits:
cpu: 8
memory: 32Gi
requests:
cpu: 4
memory: 16Gi
workerGroupSpecs:
- replicas: 10
minReplicas: 5
maxReplicas: 50 # 자동 스케일링
groupName: gpu-workers
rayStartParams:
num-cpus: '32'
num-gpus: '8'
template:
spec:
containers:
- name: ray-worker
image: thakicloud/hedge-fund-trainer:latest
resources:
limits:
nvidia.com/gpu: 8
cpu: 32
memory: 256Gi
requests:
nvidia.com/gpu: 8
cpu: 16
memory: 128Gi
Ray Tune 기반 하이퍼파라미터 최적화
ASHA (Asynchronous Successive Halving Algorithm) 구현
# training/parallel_model_training.py
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
import optuna
@ray.remote(num_gpus=0.125) # MIG 1/8 slice 사용
class ModelTrainer:
def __init__(self, model_type):
self.model_type = model_type
def train_single_model(self, config, data_path):
"""단일 모델 학습"""
if self.model_type == "garch":
return self.train_garch_model(config, data_path)
elif self.model_type == "xgboost":
return self.train_xgboost_model(config, data_path)
elif self.model_type == "nbeats":
return self.train_nbeats_model(config, data_path)
def train_garch_model(self, config, data_path):
from arch import arch_model
import pandas as pd
# 데이터 로드
data = pd.read_parquet(data_path)
returns = data['returns'].dropna()
# GARCH 모델 정의
model = arch_model(
returns,
vol='GARCH',
p=config['p'],
q=config['q'],
dist=config['dist']
)
# 모델 학습
result = model.fit(disp='off')
# 검증 성능 계산
forecasts = result.forecast(horizon=5)
validation_score = self.calculate_validation_score(forecasts, data)
return {
'validation_score': validation_score,
'aic': result.aic,
'bic': result.bic,
'model_path': self.save_model(result, config)
}
def run_parallel_training():
"""수천 개 모델 병렬 학습"""
# Ray 클러스터 초기화
ray.init(address="ray://head-node:10001")
# 스케줄러 설정 (조기 중단으로 효율성 극대화)
scheduler = ASHAScheduler(
time_attr='training_iteration',
metric='validation_score',
mode='min',
max_t=100, # 최대 100 에포크
grace_period=10, # 최소 10 에포크는 학습
reduction_factor=3 # 성능 하위 2/3 모델 조기 중단
)
# 검색 공간 정의
search_spaces = {
'garch': {
'p': tune.randint(1, 5),
'q': tune.randint(1, 5),
'dist': tune.choice(['normal', 't', 'ged'])
},
'xgboost': {
'max_depth': tune.randint(3, 10),
'learning_rate': tune.loguniform(0.01, 0.3),
'n_estimators': tune.randint(100, 1000),
'subsample': tune.uniform(0.6, 1.0)
},
'nbeats': {
'stack_types': tune.choice([
['trend', 'seasonality'],
['trend', 'seasonality', 'generic'],
['generic', 'generic']
]),
'nb_blocks_per_stack': tune.randint(2, 5),
'forecast_length': tune.randint(5, 30),
'backcast_length': tune.randint(50, 200)
}
}
# 각 모델 유형별 병렬 실행
results = {}
for model_type, search_space in search_spaces.items():
# Optuna 기반 베이지안 최적화
search_algo = OptunaSearch(
sampler=optuna.samplers.TPESampler(),
seed=42
)
# 학습 실행
analysis = tune.run(
tune.with_parameters(
train_model_wrapper,
model_type=model_type
),
config=search_space,
scheduler=scheduler,
search_alg=search_algo,
num_samples=1000, # 모델 유형당 1000개 트라이얼
resources_per_trial={
"cpu": 4,
"gpu": 0.125 if model_type == 'nbeats' else 0
},
local_dir="./ray_results",
name=f"hedge_fund_training_{model_type}"
)
results[model_type] = analysis
# 최적 모델 저장
best_config = analysis.best_config
save_best_model(model_type, best_config, analysis.best_trial)
return results
def train_model_wrapper(config, model_type):
"""Ray Tune 래퍼 함수"""
trainer = ModelTrainer.remote(model_type)
# 데이터 경로 설정 (각 자산별로 다른 데이터)
data_paths = get_data_paths_for_assets()
results = []
for data_path in data_paths:
result = ray.get(trainer.train_single_model.remote(config, data_path))
results.append(result)
# Ray Tune에 중간 결과 리포트
tune.report(
validation_score=result['validation_score'],
training_iteration=len(results)
)
# 전체 자산에 대한 평균 성능 리포트
avg_score = np.mean([r['validation_score'] for r in results])
tune.report(validation_score=avg_score, done=True)
if __name__ == "__main__":
results = run_parallel_training()
print("모든 모델 학습 완료!")
GPU 리소스 전략 및 비용 분석
MIG (Multi-Instance GPU) 활용 전략
# gpu_management/mig_controller.py
import subprocess
import json
class MIGController:
def __init__(self):
self.supported_profiles = {
'H100': [
{'profile': '1g.10gb', 'instances': 7, 'memory': '10GB'},
{'profile': '2g.20gb', 'instances': 3, 'memory': '20GB'},
{'profile': '3g.40gb', 'instances': 2, 'memory': '40GB'},
{'profile': '7g.80gb', 'instances': 1, 'memory': '80GB'}
],
'A100': [
{'profile': '1g.5gb', 'instances': 7, 'memory': '5GB'},
{'profile': '2g.10gb', 'instances': 3, 'memory': '10GB'},
{'profile': '3g.20gb', 'instances': 2, 'memory': '20GB'},
{'profile': '7g.40gb', 'instances': 1, 'memory': '40GB'}
]
}
def setup_mig_for_hedge_fund(self, gpu_type='H100', profile='1g.10gb'):
"""헤지펀드 워크로드에 최적화된 MIG 설정"""
if gpu_type not in self.supported_profiles:
raise ValueError(f"지원하지 않는 GPU 타입: {gpu_type}")
# MIG 모드 활성화
subprocess.run([
'nvidia-smi', '-mig', '1'
], check=True)
# GPU 인스턴스 생성
profile_info = next(
p for p in self.supported_profiles[gpu_type]
if p['profile'] == profile
)
for i in range(profile_info['instances']):
subprocess.run([
'nvidia-smi', 'mig', '-cgi',
f"{profile_info['profile']}"
], check=True)
# 컴퓨트 인스턴스 생성
for i in range(profile_info['instances']):
subprocess.run([
'nvidia-smi', 'mig', '-cci'
], check=True)
return {
'gpu_type': gpu_type,
'profile': profile,
'instances_created': profile_info['instances'],
'memory_per_instance': profile_info['memory']
}
def get_optimal_mig_config(self, model_types, concurrent_models):
"""모델 유형과 동시 실행 수에 따른 최적 MIG 설정 추천"""
memory_requirements = {
'garch': '1GB',
'xgboost': '2GB',
'nbeats': '5GB',
'patchtst': '8GB',
'timesfm': '20GB'
}
max_memory_needed = max([
int(memory_requirements[model_type].replace('GB', ''))
for model_type in model_types
])
# H100 기준 최적 프로필 선택
if max_memory_needed <= 5:
recommended_profile = '1g.10gb' # 7개 인스턴스
elif max_memory_needed <= 10:
recommended_profile = '2g.20gb' # 3개 인스턴스
elif max_memory_needed <= 20:
recommended_profile = '3g.40gb' # 2개 인스턴스
else:
recommended_profile = '7g.80gb' # 1개 인스턴스
return {
'recommended_profile': recommended_profile,
'max_concurrent_models': self.supported_profiles['H100'][
next(i for i, p in enumerate(self.supported_profiles['H100'])
if p['profile'] == recommended_profile)
]['instances'],
'memory_per_model': max_memory_needed
}
비용 효율성 분석
# cost_analysis/gpu_cost_calculator.py
class GPUCostAnalyzer:
def __init__(self):
# 2024년 기준 GPU 가격 (USD)
self.gpu_prices = {
'RTX_4090': 1600,
'RTX_4080': 1200,
'A100_40GB': 10000,
'A100_80GB': 15000,
'H100_80GB': 30000,
'MI300X_192GB': 7500 # AMD 대안
}
# 성능 지표 (상대적, RTX 4090 = 1.0 기준)
self.performance_scores = {
'RTX_4090': 1.0,
'RTX_4080': 0.8,
'A100_40GB': 1.8,
'A100_80GB': 2.0,
'H100_80GB': 3.5,
'MI300X_192GB': 3.0
}
# 메모리 용량 (GB)
self.memory_capacity = {
'RTX_4090': 24,
'RTX_4080': 16,
'A100_40GB': 40,
'A100_80GB': 80,
'H100_80GB': 80,
'MI300X_192GB': 192
}
def calculate_cost_efficiency(self, target_models_per_day=1000):
"""일일 모델 학습 목표 기준 비용 효율성 계산"""
results = {}
for gpu_type in self.gpu_prices:
# 동시 학습 가능 모델 수 (메모리 기준)
models_per_gpu = self.memory_capacity[gpu_type] // 5 # 모델당 5GB 가정
# 필요한 GPU 수
required_gpus = max(1, target_models_per_day // models_per_gpu)
# 총 비용
total_cost = required_gpus * self.gpu_prices[gpu_type]
# 성능 대비 비용
performance_per_dollar = (
self.performance_scores[gpu_type] * models_per_gpu /
self.gpu_prices[gpu_type]
)
results[gpu_type] = {
'models_per_gpu': models_per_gpu,
'required_gpus': required_gpus,
'total_cost': total_cost,
'cost_per_model': total_cost / target_models_per_day,
'performance_per_dollar': performance_per_dollar,
'mig_support': gpu_type in ['A100_40GB', 'A100_80GB', 'H100_80GB']
}
# 비용 효율성 순으로 정렬
sorted_results = sorted(
results.items(),
key=lambda x: x[1]['performance_per_dollar'],
reverse=True
)
return dict(sorted_results)
def recommend_gpu_configuration(self, budget_usd, target_models_per_day):
"""예산과 목표에 따른 GPU 구성 추천"""
cost_analysis = self.calculate_cost_efficiency(target_models_per_day)
recommendations = []
for gpu_type, metrics in cost_analysis.items():
if metrics['total_cost'] <= budget_usd:
recommendations.append({
'gpu_type': gpu_type,
'configuration': f"{metrics['required_gpus']}x {gpu_type}",
'total_cost': metrics['total_cost'],
'models_per_day': metrics['required_gpus'] * metrics['models_per_gpu'],
'cost_efficiency': metrics['performance_per_dollar'],
'budget_utilization': metrics['total_cost'] / budget_usd * 100
})
return sorted(recommendations, key=lambda x: x['cost_efficiency'], reverse=True)
# 사용 예시
analyzer = GPUCostAnalyzer()
# 1일 1000개 모델 학습 목표
cost_analysis = analyzer.calculate_cost_efficiency(1000)
print("GPU별 비용 효율성 분석:")
for gpu_type, metrics in cost_analysis.items():
print(f"{gpu_type}: ${metrics['cost_per_model']:.2f}/모델, "
f"효율성: {metrics['performance_per_dollar']:.3f}")
# 50만 달러 예산으로 최적 구성 추천
budget_recommendations = analyzer.recommend_gpu_configuration(500000, 1000)
print("\n예산 내 추천 구성:")
for rec in budget_recommendations[:3]:
print(f"{rec['configuration']}: ${rec['total_cost']:,} "
f"({rec['budget_utilization']:.1f}% 예산 사용)")
실습: 기본 환경 구성 및 모델 테스트
이제 실제로 헤지펀드 수준의 시계열 모델링 환경을 macOS에서 구축해보겠습니다.
1. 개발 환경 설정
#!/bin/bash
# setup_hedge_fund_env.sh
# 기본 정보 출력
echo "🏦 헤지펀드 시계열 모델링 환경 설정"
echo "📍 작업 디렉토리: $(pwd)"
echo "🖥️ 시스템: $(uname -s) $(uname -r)"
echo "🐍 Python 버전: $(python3 --version)"
# 가상환경 생성
python3 -m venv hedge_fund_env
source hedge_fund_env/bin/activate
# 필수 패키지 설치
pip install --upgrade pip
# 시계열 모델링 라이브러리
pip install numpy==1.24.3
pip install pandas==2.0.3
pip install scipy==1.11.1
# 시계열 특화 라이브러리
pip install arch==6.2.0 # GARCH 모델
pip install statsmodels==0.14.0
pip install prophet==1.1.4
# 머신러닝 라이브러리
pip install scikit-learn==1.3.0
pip install xgboost==1.7.6
pip install lightgbm==4.0.0
# 딥러닝 라이브러리 (PyTorch)
pip install torch==2.0.1
pip install pytorch-forecasting==1.0.0
# Ray 분산 컴퓨팅
pip install ray[tune]==2.8.0
pip install ray[data]==2.8.0
# 데이터 처리 및 시각화
pip install matplotlib==3.7.2
pip install seaborn==0.12.2
pip install plotly==5.15.0
# 백테스팅 라이브러리
pip install vectorbt==0.25.2
pip install zipline-reloaded==3.0.2
# 패키지 설치 확인
echo "✅ 설치 완료된 주요 패키지:"
pip list | grep -E "(arch|xgboost|torch|ray|pandas)"
# 디렉토리 구조 생성
mkdir -p {data,models,notebooks,scripts,results}
mkdir -p models/{garch,xgboost,neural,ensemble}
mkdir -p data/{raw,processed,features}
echo "✅ 헤지펀드 모델링 환경 설정 완료!"
echo "📁 프로젝트 구조:"
tree -L 2 || ls -la
2. 샘플 데이터 생성 및 전처리
# scripts/data_generator.py
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class MarketDataGenerator:
"""헤지펀드 스타일 시장 데이터 시뮬레이터"""
def __init__(self, start_date='2020-01-01', end_date='2024-12-31'):
self.start_date = pd.to_datetime(start_date)
self.end_date = pd.to_datetime(end_date)
self.trading_days = pd.bdate_range(start=self.start_date, end=self.end_date)
def generate_price_series(self, initial_price=100, volatility=0.02):
"""GARCH 효과가 있는 가격 시계열 생성"""
n_days = len(self.trading_days)
# GARCH(1,1) 스타일 변동성 모델링
omega = 0.00001 # 장기 변동성
alpha = 0.05 # ARCH 효과
beta = 0.90 # GARCH 효과
# 변동성 시계열
volatilities = np.zeros(n_days)
volatilities[0] = volatility
# 수익률 및 가격 생성
returns = np.zeros(n_days)
prices = np.zeros(n_days)
prices[0] = initial_price
for t in range(1, n_days):
# 변동성 업데이트 (GARCH 과정)
volatilities[t] = np.sqrt(
omega + alpha * returns[t-1]**2 + beta * volatilities[t-1]**2
)
# 수익률 생성 (정규분포 + 팻테일 효과)
if np.random.random() < 0.05: # 5% 확률로 극단적 움직임
returns[t] = np.random.normal(0, volatilities[t] * 3)
else:
returns[t] = np.random.normal(0, volatilities[t])
# 가격 업데이트
prices[t] = prices[t-1] * (1 + returns[t])
return pd.DataFrame({
'date': self.trading_days,
'price': prices,
'returns': returns,
'volatility': volatilities
})
def add_intraday_patterns(self, df):
"""장중 패턴 추가 (개장/마감 효과 등)"""
df = df.copy()
# 요일 효과
df['weekday'] = df['date'].dt.dayofweek
monday_effect = (df['weekday'] == 0) * np.random.normal(-0.001, 0.002, len(df))
friday_effect = (df['weekday'] == 4) * np.random.normal(0.0005, 0.001, len(df))
df['returns'] += monday_effect + friday_effect
# 월말 효과
df['month_end'] = df['date'].dt.is_month_end.astype(int)
month_end_effect = df['month_end'] * np.random.normal(0.002, 0.001, len(df))
df['returns'] += month_end_effect
# 가격 재계산
df['price'] = df['price'].iloc[0] * (1 + df['returns']).cumprod()
return df
def add_alternative_data(self, df):
"""대체 데이터 추가 (뉴스 센티멘트, 거래량 등)"""
df = df.copy()
# 뉴스 센티멘트 (가상)
df['news_sentiment'] = np.random.normal(0, 1, len(df))
# 소셜미디어 멘션 수 (가상)
df['social_mentions'] = np.random.poisson(100, len(df))
# 거래량 (가격 변화와 상관관계 있음)
base_volume = 1000000
volume_multiplier = 1 + 2 * np.abs(df['returns']) # 변동성과 거래량 정비례
df['volume'] = (base_volume * volume_multiplier).astype(int)
# 옵션 내재 변동성 (가상)
df['implied_volatility'] = df['volatility'] * (1 + np.random.normal(0, 0.1, len(df)))
return df
def generate_multi_asset_data(self, assets=['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'SPY']):
"""다중 자산 데이터 생성"""
all_data = {}
# 시장 공통 팩터 (시스템적 리스크)
market_factor = np.random.normal(0, 0.01, len(self.trading_days))
for asset in assets:
# 자산별 고유 변동성
asset_volatility = np.random.uniform(0.015, 0.035)
# 시장 베타 (시장과의 상관관계)
beta = np.random.uniform(0.5, 1.5)
# 기본 가격 시계열 생성
df = self.generate_price_series(
initial_price=np.random.uniform(50, 300),
volatility=asset_volatility
)
# 시장 팩터 적용
df['returns'] += beta * market_factor
# 장중 패턴 추가
df = self.add_intraday_patterns(df)
# 대체 데이터 추가
df = self.add_alternative_data(df)
# 자산 정보 추가
df['asset'] = asset
df['beta'] = beta
all_data[asset] = df
return all_data
# 데이터 생성 실행
if __name__ == "__main__":
generator = MarketDataGenerator()
print("📊 시장 데이터 생성 중...")
multi_asset_data = generator.generate_multi_asset_data()
# 데이터 저장
for asset, df in multi_asset_data.items():
df.to_parquet(f'data/processed/{asset}_daily_data.parquet')
print(f"✅ {asset} 데이터 저장 완료: {len(df)}일")
# 전체 데이터 통합
combined_df = pd.concat([
df.assign(asset=asset) for asset, df in multi_asset_data.items()
], ignore_index=True)
combined_df.to_parquet('data/processed/multi_asset_data.parquet')
print(f"✅ 전체 데이터 저장 완료")
print(f"📈 총 {len(combined_df):,}개 데이터 포인트")
print(f"🗓️ 기간: {combined_df['date'].min()} ~ {combined_df['date'].max()}")
3. GARCH 모델 구현 및 테스트
# models/garch/garch_ensemble.py
import numpy as np
import pandas as pd
from arch import arch_model
from sklearn.metrics import mean_squared_error
import warnings
warnings.filterwarnings('ignore')
class HedgeFundGARCHEnsemble:
"""헤지펀드 스타일 GARCH 앙상블"""
def __init__(self):
self.models = {}
self.fitted_models = {}
self.performance_metrics = {}
def create_garch_variants(self):
"""다양한 GARCH 모델 변형 생성"""
variants = {
'GARCH_11': {'vol': 'GARCH', 'p': 1, 'q': 1, 'dist': 'normal'},
'EGARCH_11': {'vol': 'EGARCH', 'p': 1, 'q': 1, 'dist': 'normal'},
'TGARCH_11': {'vol': 'GARCH', 'p': 1, 'q': 1, 'dist': 't'},
'GARCH_22': {'vol': 'GARCH', 'p': 2, 'q': 2, 'dist': 'normal'},
'EGARCH_12': {'vol': 'EGARCH', 'p': 1, 'q': 2, 'dist': 'skewt'}
}
return variants
def fit_single_garch(self, returns, variant_config):
"""단일 GARCH 모델 학습"""
try:
model = arch_model(
returns.dropna() * 100, # 백분율 변환
vol=variant_config['vol'],
p=variant_config['p'],
q=variant_config['q'],
dist=variant_config['dist']
)
fitted_model = model.fit(disp='off', show_warning=False)
return {
'model': fitted_model,
'aic': fitted_model.aic,
'bic': fitted_model.bic,
'log_likelihood': fitted_model.loglikelihood,
'status': 'success'
}
except Exception as e:
return {
'model': None,
'error': str(e),
'status': 'failed'
}
def fit_ensemble(self, asset_data):
"""전체 앙상블 모델 학습"""
results = {}
variants = self.create_garch_variants()
for asset in asset_data.keys():
print(f"🔄 {asset} GARCH 앙상블 학습 중...")
returns = asset_data[asset]['returns']
asset_results = {}
for variant_name, config in variants.items():
print(f" 📊 {variant_name} 학습...")
result = self.fit_single_garch(returns, config)
asset_results[variant_name] = result
if result['status'] == 'success':
print(f" ✅ AIC: {result['aic']:.2f}, BIC: {result['bic']:.2f}")
else:
print(f" ❌ 실패: {result['error']}")
results[asset] = asset_results
self.fitted_models = results
return results
def forecast_volatility(self, asset, horizon=5):
"""변동성 예측"""
if asset not in self.fitted_models:
raise ValueError(f"자산 {asset}에 대한 학습된 모델이 없습니다.")
forecasts = {}
weights = {}
# 각 모델의 가중치 계산 (AIC 기반)
aics = []
for variant_name, result in self.fitted_models[asset].items():
if result['status'] == 'success':
aics.append(result['aic'])
else:
aics.append(float('inf'))
# AIC 기반 가중치 (낮을수록 좋음)
aic_weights = np.exp(-np.array(aics) / 2)
aic_weights = aic_weights / aic_weights.sum()
# 각 모델의 예측
ensemble_forecast = np.zeros(horizon)
for i, (variant_name, result) in enumerate(self.fitted_models[asset].items()):
if result['status'] == 'success':
model_forecast = result['model'].forecast(horizon=horizon)
vol_forecast = np.sqrt(model_forecast.variance.iloc[-1].values)
forecasts[variant_name] = vol_forecast
weights[variant_name] = aic_weights[i]
# 가중 평균에 기여
ensemble_forecast += aic_weights[i] * vol_forecast
return {
'ensemble_forecast': ensemble_forecast / 100, # 백분율에서 소수점으로 변환
'individual_forecasts': forecasts,
'weights': weights
}
def calculate_var(self, asset, confidence_level=0.01, horizon=1):
"""VaR (Value at Risk) 계산"""
vol_forecast = self.forecast_volatility(asset, horizon=horizon)
# 정규분포 가정하에 VaR 계산
from scipy.stats import norm
z_score = norm.ppf(confidence_level)
var_estimate = z_score * vol_forecast['ensemble_forecast'][0]
return {
'var_1_percent': var_estimate,
'volatility_forecast': vol_forecast['ensemble_forecast'][0],
'confidence_level': confidence_level,
'horizon_days': horizon
}
def backtest_models(self, asset_data, test_period=252):
"""모델 백테스팅"""
backtest_results = {}
for asset in asset_data.keys():
print(f"🧪 {asset} 백테스팅 중...")
returns = asset_data[asset]['returns']
# 훈련/테스트 분할
train_returns = returns[:-test_period]
test_returns = returns[-test_period:]
# 훈련 데이터로 모델 학습
temp_data = {asset: {'returns': train_returns}}
self.fit_ensemble(temp_data)
# 테스트 기간 동안 예측
predictions = []
actuals = []
for i in range(len(test_returns) - 5):
# 5일 변동성 예측
vol_pred = self.forecast_volatility(asset, horizon=5)
# 실제 변동성 계산
actual_vol = test_returns.iloc[i:i+5].std()
predictions.append(vol_pred['ensemble_forecast'][0])
actuals.append(actual_vol)
# 성능 지표 계산
mse = mean_squared_error(actuals, predictions)
mae = np.mean(np.abs(np.array(actuals) - np.array(predictions)))
backtest_results[asset] = {
'mse': mse,
'mae': mae,
'predictions': predictions,
'actuals': actuals,
'correlation': np.corrcoef(predictions, actuals)[0, 1]
}
print(f" 📊 MSE: {mse:.6f}, MAE: {mae:.6f}, 상관관계: {backtest_results[asset]['correlation']:.3f}")
return backtest_results
# 테스트 실행
if __name__ == "__main__":
# 데이터 로드
assets = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'SPY']
asset_data = {}
for asset in assets:
try:
df = pd.read_parquet(f'data/processed/{asset}_daily_data.parquet')
asset_data[asset] = df
print(f"✅ {asset} 데이터 로드 완료: {len(df)}일")
except FileNotFoundError:
print(f"❌ {asset} 데이터 파일을 찾을 수 없습니다.")
if asset_data:
# GARCH 앙상블 학습
garch_ensemble = HedgeFundGARCHEnsemble()
print("\n🏦 헤지펀드 GARCH 앙상블 학습 시작")
ensemble_results = garch_ensemble.fit_ensemble(asset_data)
# VaR 계산
print("\n📊 VaR 계산")
for asset in assets[:3]: # 처음 3개 자산만
if asset in asset_data:
var_result = garch_ensemble.calculate_var(asset)
print(f"{asset} 1% VaR (1일): {var_result['var_1_percent']:.4f}")
print(f" 예상 변동성: {var_result['volatility_forecast']:.4f}")
# 백테스팅
print("\n🧪 모델 백테스팅")
backtest_results = garch_ensemble.backtest_models(asset_data, test_period=60)
print("\n✅ GARCH 앙상블 테스트 완료!")
4. Ray 분산 학습 테스트
# scripts/test_ray_distributed.py
import ray
import numpy as np
import pandas as pd
from ray import tune
import time
import os
@ray.remote
class ModelTrainingActor:
"""분산 모델 학습용 Ray Actor"""
def __init__(self):
self.model_count = 0
def train_lightweight_model(self, config):
"""가벼운 모델 학습 시뮬레이션"""
# 시뮬레이션된 학습 과정
import time
import random
# 모델 복잡도에 따른 학습 시간
complexity = config.get('complexity', 1.0)
sleep_time = complexity * random.uniform(0.1, 0.5)
time.sleep(sleep_time)
# 가상의 성능 점수 계산
performance = random.uniform(0.7, 0.95) * (1 + config.get('learning_rate', 0.01))
self.model_count += 1
return {
'model_id': f"model_{self.model_count}",
'performance': performance,
'config': config,
'training_time': sleep_time,
'actor_id': ray.get_runtime_context().get_actor_id()
}
def get_stats(self):
"""Actor 통계 반환"""
return {
'models_trained': self.model_count,
'actor_id': ray.get_runtime_context().get_actor_id()
}
def test_ray_parallel_training():
"""Ray 병렬 학습 테스트"""
# Ray 초기화 (로컬 모드)
if not ray.is_initialized():
ray.init()
print("🚀 Ray 분산 학습 테스트 시작")
print(f"💻 사용 가능한 CPU: {ray.available_resources().get('CPU', 0)}")
# 여러 Actor 생성 (워커 시뮬레이션)
num_workers = min(4, int(ray.available_resources().get('CPU', 1)))
workers = [ModelTrainingActor.remote() for _ in range(num_workers)]
print(f"👥 {num_workers}개 워커 생성 완료")
# 학습할 모델 설정 생성
model_configs = []
for i in range(50): # 50개 모델 설정
config = {
'learning_rate': np.random.uniform(0.001, 0.1),
'complexity': np.random.uniform(0.5, 2.0),
'model_type': np.random.choice(['garch', 'xgboost', 'lstm']),
'asset': np.random.choice(['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'SPY'])
}
model_configs.append(config)
# 병렬 학습 실행
start_time = time.time()
# 작업을 워커들에게 분산
futures = []
for i, config in enumerate(model_configs):
worker = workers[i % num_workers]
future = worker.train_lightweight_model.remote(config)
futures.append(future)
# 결과 수집
results = ray.get(futures)
end_time = time.time()
# 결과 분석
total_time = end_time - start_time
models_per_second = len(model_configs) / total_time
print(f"\n📊 병렬 학습 결과:")
print(f" 🔢 총 모델 수: {len(model_configs)}")
print(f" ⏱️ 총 학습 시간: {total_time:.2f}초")
print(f" 🚀 초당 모델 학습 수: {models_per_second:.2f}")
# 각 워커별 통계
print(f"\n👥 워커별 통계:")
worker_stats = ray.get([worker.get_stats.remote() for worker in workers])
for i, stats in enumerate(worker_stats):
print(f" 워커 {i+1}: {stats['models_trained']}개 모델 학습")
# 성능 분포 분석
performances = [result['performance'] for result in results]
print(f"\n📈 성능 분석:")
print(f" 평균 성능: {np.mean(performances):.3f}")
print(f" 최고 성능: {np.max(performances):.3f}")
print(f" 성능 표준편차: {np.std(performances):.3f}")
# 모델 유형별 성능
model_type_performance = {}
for result in results:
model_type = result['config']['model_type']
if model_type not in model_type_performance:
model_type_performance[model_type] = []
model_type_performance[model_type].append(result['performance'])
print(f"\n🔍 모델 유형별 평균 성능:")
for model_type, perfs in model_type_performance.items():
print(f" {model_type}: {np.mean(perfs):.3f} (n={len(perfs)})")
# Ray 종료
ray.shutdown()
return {
'total_models': len(model_configs),
'total_time': total_time,
'models_per_second': models_per_second,
'results': results
}
def test_ray_tune_hyperparameter_optimization():
"""Ray Tune을 이용한 하이퍼파라미터 최적화 테스트"""
if not ray.is_initialized():
ray.init()
print("🔍 Ray Tune 하이퍼파라미터 최적화 테스트")
def objective_function(config):
"""최적화할 목적 함수"""
import time
import random
# 시뮬레이션된 모델 학습
time.sleep(0.1) # 학습 시간 시뮬레이션
# 하이퍼파라미터에 따른 성능 계산
lr_penalty = abs(config['learning_rate'] - 0.01) * 10
complexity_bonus = config['complexity'] * 0.1
score = 0.85 + complexity_bonus - lr_penalty + random.uniform(-0.05, 0.05)
score = max(0.1, min(1.0, score)) # 0.1-1.0 범위로 제한
tune.report(score=score)
# 검색 공간 정의
search_space = {
'learning_rate': tune.loguniform(0.001, 0.1),
'complexity': tune.uniform(0.5, 2.0),
'batch_size': tune.choice([16, 32, 64, 128])
}
# Ray Tune 실행
analysis = tune.run(
objective_function,
config=search_space,
num_samples=20, # 20번의 트라이얼
verbose=1
)
# 최적 결과 출력
best_config = analysis.best_config
best_score = analysis.best_result['score']
print(f"\n🏆 최적화 결과:")
print(f" 최고 점수: {best_score:.4f}")
print(f" 최적 설정:")
for key, value in best_config.items():
print(f" {key}: {value}")
ray.shutdown()
return analysis
if __name__ == "__main__":
print("🧪 Ray 분산 컴퓨팅 테스트 시작\n")
# 병렬 학습 테스트
parallel_results = test_ray_parallel_training()
print("\n" + "="*50 + "\n")
# 하이퍼파라미터 최적화 테스트
tune_results = test_ray_tune_hyperparameter_optimization()
print("\n✅ 모든 Ray 테스트 완료!")
# 실제 헤지펀드 환경에서의 예상 성능
cpus_available = parallel_results['models_per_second']
print(f"\n🏦 헤지펀드 환경 성능 예측:")
print(f" 현재 시스템: {cpus_available:.1f} 모델/초")
# 스케일링 예측
gpu_cluster_speedup = 100 # GPU 클러스터 가정
estimated_daily_capacity = cpus_available * gpu_cluster_speedup * 3600 * 8 # 8시간 작업
print(f" GPU 클러스터 환경: {estimated_daily_capacity:,.0f} 모델/일")
print(f" 목표 달성률: {min(100, estimated_daily_capacity/1000):.1f}% (목표: 1000 모델/일)")
5. 환경 설정 자동화 스크립트
#!/bin/bash
# scripts/hedge_fund_setup_test.sh
echo "🏦 헤지펀드 시계열 모델링 환경 테스트"
echo "=================================================="
# 현재 환경 정보
echo "📍 시스템 정보:"
echo " OS: $(uname -s) $(uname -r)"
echo " 아키텍처: $(uname -m)"
echo " CPU 코어: $(sysctl -n hw.ncpu 2>/dev/null || nproc 2>/dev/null || echo 'N/A')"
echo " 메모리: $(sysctl -n hw.memsize 2>/dev/null | awk '{print $1/1024/1024/1024 " GB"}' || free -h 2>/dev/null | awk '/^Mem:/ {print $2}' || echo 'N/A')"
# Python 환경 확인
echo ""
echo "🐍 Python 환경:"
if command -v python3 &> /dev/null; then
echo " Python 버전: $(python3 --version)"
echo " Python 경로: $(which python3)"
else
echo " ❌ Python3이 설치되지 않았습니다."
exit 1
fi
# 가상환경 활성화 및 패키지 설치
if [ ! -d "hedge_fund_env" ]; then
echo ""
echo "🔧 가상환경 생성 중..."
python3 -m venv hedge_fund_env
fi
source hedge_fund_env/bin/activate
echo "✅ 가상환경 활성화 완료"
# 필수 패키지 설치 확인
echo ""
echo "📦 필수 패키지 설치 상태 확인:"
packages=(
"numpy"
"pandas"
"arch"
"xgboost"
"torch"
"ray"
"scikit-learn"
)
for package in "${packages[@]}"; do
if python3 -c "import $package" 2>/dev/null; then
version=$(python3 -c "import $package; print($package.__version__)" 2>/dev/null || echo "버전 확인 불가")
echo " ✅ $package: $version"
else
echo " ❌ $package: 설치되지 않음"
echo " 설치 명령: pip install $package"
fi
done
# 데이터 디렉토리 확인
echo ""
echo "📁 프로젝트 구조 확인:"
directories=("data" "models" "scripts" "results")
for dir in "${directories[@]}"; do
if [ -d "$dir" ]; then
file_count=$(find "$dir" -type f | wc -l | tr -d ' ')
echo " ✅ $dir/: $file_count 개 파일"
else
echo " ❌ $dir/: 디렉토리 없음"
mkdir -p "$dir"
echo " ✅ $dir/ 디렉토리 생성 완료"
fi
done
# 테스트 스크립트 실행
echo ""
echo "🧪 기본 기능 테스트:"
# 1. 데이터 생성 테스트
if python3 scripts/data_generator.py > /dev/null 2>&1; then
echo " ✅ 데이터 생성: 성공"
else
echo " ❌ 데이터 생성: 실패"
fi
# 2. GARCH 모델 테스트
if python3 models/garch/garch_ensemble.py > /dev/null 2>&1; then
echo " ✅ GARCH 모델: 성공"
else
echo " ❌ GARCH 모델: 실패"
fi
# 3. Ray 분산 컴퓨팅 테스트
if python3 scripts/test_ray_distributed.py > /dev/null 2>&1; then
echo " ✅ Ray 분산 컴퓨팅: 성공"
else
echo " ❌ Ray 분산 컴퓨팅: 실패"
fi
# 성능 벤치마크
echo ""
echo "⚡ 성능 벤치마크:"
# CPU 성능 테스트
python3 -c "
import time
import numpy as np
# 행렬 연산 성능 테스트
start = time.time()
a = np.random.rand(1000, 1000)
b = np.random.rand(1000, 1000)
c = np.dot(a, b)
end = time.time()
print(f' 행렬 연산 (1000x1000): {end-start:.3f}초')
# 시계열 계산 성능 테스트
start = time.time()
data = np.random.randn(10000)
for i in range(100):
rolling_mean = np.convolve(data, np.ones(20)/20, mode='valid')
end = time.time()
print(f' 이동평균 (10K 데이터, 100회): {end-start:.3f}초')
"
# zshrc 알리아스 제안
echo ""
echo "🔧 zshrc 알리아스 제안:"
echo "다음 명령어를 ~/.zshrc에 추가하세요:"
echo ""
echo "# 헤지펀드 모델링 환경"
echo "alias hedge_env='cd $(pwd) && source hedge_fund_env/bin/activate'"
echo "alias run_data_gen='python3 scripts/data_generator.py'"
echo "alias run_garch='python3 models/garch/garch_ensemble.py'"
echo "alias run_ray_test='python3 scripts/test_ray_distributed.py'"
echo "alias hedge_test='bash scripts/hedge_fund_setup_test.sh'"
echo ""
echo "✅ 헤지펀드 모델링 환경 테스트 완료!"
echo "🚀 이제 대규모 시계열 모델 학습을 시작할 수 있습니다."
실습 실행 및 결과
위의 스크립트들을 실행하여 실제 환경에서 테스트해보겠습니다.