Microsoft ArchScale: 확장 가능한 LLM 사전 훈련 프레임워크 완벽 가이드
⏱️ 예상 읽기 시간: 25분
서론
Microsoft ArchScale은 신경망 아키텍처 연구를 위한 단순하면서도 확장 가능한 사전 훈련 프레임워크입니다. 110M부터 3.3B 파라미터까지의 모델을 효율적으로 훈련할 수 있으며, μP++ 스케일링 법칙을 통해 하이퍼파라미터 튜닝의 비용을 대폭 절감할 수 있습니다.
ArchScale의 핵심 특징
- 🎯 다양한 아키텍처 지원: Phi4-mini-Flash, SambaY, Transformer++, Mamba 등
- 📊 μP++ 스케일링: 작은 모델에서 튜닝한 하이퍼파라미터를 대규모 모델에 자동 전이
- 🚀 효율적인 훈련: Flash Attention, RoPE, fused kernel 등 최적화 기술
- 📏 롱 컨텍스트: 최대 128K 시퀀스 길이까지 가변 길이 훈련
- 🔍 포괄적 평가: RULER, Phonebook, 추론 벤치마크 등
환경 설정 및 설치
1. Docker 환경 구성
ArchScale은 Docker를 통한 환경 설정을 권장합니다:
# ArchScale 저장소 클론
git clone https://github.com/microsoft/ArchScale.git
cd ArchScale
# Docker 이미지 빌드
docker build -t archscale:latest .
# 컨테이너 실행 (GPU 지원)
docker run --gpus all -it --rm \
-v $(pwd):/workspace \
-v /path/to/data:/data \
archscale:latest bash
2. 직접 설치
Docker를 사용하지 않는 경우:
# 의존성 설치
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
pip install lightning datasets transformers wandb
# ArchScale 클론 및 설치
git clone https://github.com/microsoft/ArchScale.git
cd ArchScale
pip install -e .
3. 데이터 준비
SlimPajama 데이터셋 토큰화
# Samba 코드베이스 참조하여 데이터 토큰화
# 예시: SlimPajama 전처리
python scripts/prepare_data.py \
--input_dir /path/to/slimpajama \
--output_dir /path/to/tokenized_data \
--tokenizer microsoft/Phi-4-mini-flash-reasoning \
--context_length 8192
핵심 개념: μP++ 스케일링 법칙
μP++란?
μP++ (Maximal Update Parameterization)는 모델 크기가 변해도 최적의 하이퍼파라미터를 자동으로 스케일링하는 기법입니다.
# BaseHyperparameters 설정 예시
from lit_gpt.config import BaseHyperparameters
# d16 (depth=16) 기준 하이퍼파라미터
base_hps = BaseHyperparameters(
eta0=5e-4, # 학습률
b0=8388608, # 배치 크기 (토큰 수)
warmup_tokens0=25_165_824_000, # 워밍업 토큰 수
# 기타 최적화 관련 파라미터들
)
# 실제 d8 또는 d24 모델 훈련 시 자동 스케일링됨
스케일링 법칙의 장점
- 비용 절감: 작은 모델로 하이퍼파라미터 튜닝 후 대규모 모델에 적용
- 일관성: 모델 크기와 관계없이 안정적인 훈련
- 효율성: 실험 회수 대폭 감소
실전 훈련 가이드
1. Phi4-mini-Flash 모델 사전 훈련
5T 토큰의 고품질 데이터로 Phi4-mini-Flash 모델을 훈련:
#!/bin/bash
# phi4_pretrain.sh
export LIGHTNING_ARTIFACTS_DIR='/path/to/output_dir'
export MASTER_ADDR='master_node_ip'
export MASTER_PORT='29500'
# 1K GPU (128 노드 × 8 GPU)로 분산 훈련
torchrun --nnodes=128 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/phi4/data \
--base_hps.eta0=5e-4 \
--base_hps.b0=8388608 \
--base_hps.warmup_tokens0=25_165_824_000 \
--ctx_len 8192 \
--max_tokens 5e12 \
--resume="auto" \
--train_model phi4miniflash \
--depth 32 \
--train_name scaling
2. SambaY 아키텍처 훈련 (권장)
더 깔끔한 아키텍처인 SambaY 모델 훈련:
#!/bin/bash
# sambay_pretrain.sh
torchrun --nnodes=128 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/data \
--train_model sambayda \
--depth 24 \
--vocab_size 200064 \
--train_name scaling_mup_tie \
--ctx_len 8192 \
--max_tokens 1e12
3. FLOPs 스케일링 실험
110M부터 3.3B 파라미터까지 다양한 크기의 모델 훈련:
#!/bin/bash
# flops_scaling.sh
export MASTER_ADDR='localhost'
export MASTER_PORT='29500'
# 8 GPU로 다양한 depth 실험
for depth in 8 12 16 20 24; do
echo "Training model with depth=${depth}"
torchrun --nnodes=1 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/slim_pajama/data \
--val_data_dir /path/to/slim_pajama/data \
--train_model sambay \
--depth ${depth} \
--train_name scaling_mup \
--max_tokens 1e11
done
# 스케일링 곡선 생성
python plot_flops_scaling.py \
--log_dir /path/to/output_dir \
--output_file flops_scaling_results.png
4. 데이터 스케일링 실험
1B 파라미터 모델로 100B~600B 토큰 스케일링:
#!/bin/bash
# data_scaling.sh
# 64 GPU (8 노드 × 8 GPU)로 데이터 스케일링
for tokens in 1e11 2e11 3e11 4e11 5e11 6e11; do
echo "Training with ${tokens} tokens"
torchrun --nnodes=8 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/slim_pajama/data \
--val_data_dir /path/to/slim_pajama/data \
--train_model transformer \
--depth 16 \
--max_tokens ${tokens} \
--train_name scaling_mup_tie
done
# 데이터 스케일링 곡선 분석
python plot_data_scaling.py \
--log_dir /path/to/output_dir \
--output_file data_scaling_results.png
하이퍼파라미터 튜닝 전략
1. μP++를 활용한 효율적 튜닝
#!/bin/bash
# hyperparameter_sweep.sh
# d16, 1B 모델 기준으로 하이퍼파라미터 튜닝
# 실제로는 d8 모델에서 빠르게 실험
for lr in 4e-4 1e-4 1e-3; do
for batch_size in 4194304 8388608 16777216; do
echo "Testing lr=${lr}, batch_size=${batch_size}"
torchrun --nnodes=1 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/slim_pajama/data \
--val_data_dir /path/to/slim_pajama/data \
--train_model transformer \
--depth 8 \
--base_hps.eta0=${lr} \
--base_hps.b0=${batch_size} \
--train_name "hp_sweep_lr${lr}_bs${batch_size}" \
--max_tokens 1e10
done
done
2. 최적 하이퍼파라미터 분석
# analyze_hyperparameters.py
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
def analyze_sweep_results(log_dir):
"""하이퍼파라미터 스위프 결과 분석"""
results = []
for log_file in Path(log_dir).glob("*/metrics.csv"):
# 각 실험의 최종 손실값 추출
df = pd.read_csv(log_file)
final_loss = df['val_loss'].min()
# 실험명에서 하이퍼파라미터 추출
exp_name = log_file.parent.name
lr = float(exp_name.split('lr')[1].split('_')[0])
bs = int(exp_name.split('bs')[1])
results.append({
'learning_rate': lr,
'batch_size': bs,
'final_loss': final_loss
})
results_df = pd.DataFrame(results)
# 최적 하이퍼파라미터 찾기
best_idx = results_df['final_loss'].idxmin()
best_config = results_df.iloc[best_idx]
print(f"최적 설정:")
print(f"학습률: {best_config['learning_rate']}")
print(f"배치 크기: {best_config['batch_size']}")
print(f"최종 손실: {best_config['final_loss']:.4f}")
return results_df, best_config
if __name__ == "__main__":
results_df, best_config = analyze_sweep_results("/path/to/sweep_logs")
롱 컨텍스트 훈련
1. ProLong-64K 데이터로 32K 컨텍스트 훈련
#!/bin/bash
# long_context_training.sh
# ProLong-64K 데이터 전처리 (사전 토큰화)
python scripts/preprocess_prolong.py \
--input_dir /path/to/prolong_raw \
--output_dir /path/to/prolong_tokenized \
--max_length 65536 \
--shuffle
# 32K 시퀀스 길이로 훈련
torchrun --nnodes=1 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/prolong_tokenized \
--val_data_dir /path/to/prolong_tokenized \
--train_model transformer \
--depth 16 \
--ctx_len 32768 \
--max_tokens 4e10 \
--train_name scaling_mup_rbase_varlen
2. 128K 컨텍스트 훈련 (최대 규모)
#!/bin/bash
# ultra_long_context.sh
# 메모리 절약 모드로 128K 컨텍스트 훈련
torchrun --nnodes=4 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/prolong_tokenized \
--train_model transformer \
--depth 20 \
--ctx_len 131072 \
--fsdp_save_mem=true \
--train_name ultra_long_context \
--max_tokens 2e10
3. 가변 길이 훈련 설정
# variable_length_config.py
from lit_gpt.config import Config
def setup_variable_length_training():
"""가변 길이 훈련을 위한 설정"""
config = Config(
# 기본 모델 설정
n_layer=16,
n_head=16,
n_embd=2048,
# 가변 길이 설정
variable_length=True,
pack_sequences=True,
eos_token_id=2, # EOS 토큰으로 문서 분리
# RoPE 설정 (긴 컨텍스트용)
rope_base=10000, # 기본값
rope_scaling=None, # 또는 {"type": "linear", "factor": 2.0}
# 메모리 최적화
use_gradient_checkpointing=True,
attention_dropout=0.0,
residual_dropout=0.1
)
return config
Mamba 아키텍처 훈련
1. Mamba-1 설치
# 가변 길이 Mamba 설치
git clone https://github.com/zigzagcai/varlen_mamba.git --branch feat/add-cu_seqlens
cd varlen_mamba
pip install --no-build-isolation -e .
2. Mamba 모델 훈련
#!/bin/bash
# mamba_training.sh
torchrun --nnodes=2 --nproc_per_node=8 \
--rdzv_backend=c10d \
--rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
pretrain.py \
--train_data_dir /path/to/data \
--train_model mamba \
--depth 16 \
--ctx_len 16384 \
--train_name mamba_varlen \
--max_tokens 1e11
포괄적 평가 시스템
1. 표준 NLP 벤치마크
#!/bin/bash
# standard_evaluation.sh
python eval.py \
--model ArchScale \
--model_args pretrained=/path/to/checkpoint.pth,config="sambay_d16" \
--tasks wikitext,lambada_openai,arc_easy,arc_challenge,winogrande,hellaswag,piqa,social_iqa \
--device cuda \
--batch_size 16 \
--trust_remote_code \
--output_path /path/to/eval_results.json
2. 롱 컨텍스트 평가
RULER 벤치마크
#!/bin/bash
# ruler_evaluation.sh
# Needle-in-a-Haystack 테스트
python eval.py \
--model ArchScale \
--model_args pretrained=/path/to/checkpoint.pth,config="sambay_d16",max_length=32768,tokenizer=Orkhan/llama-2-7b-absa \
--metadata='{"max_seq_lengths":[32768]}' \
--tasks niah_single_1,niah_single_2,niah_single_3 \
--device cuda \
--batch_size 4 \
--trust_remote_code
Phonebook 평가
#!/bin/bash
# phonebook_evaluation.sh
python eval_phonebook.py \
--checkpoint_path /path/to/checkpoint.pth \
--config "sambay_d16" \
--min_eval_len 1850 \
--max_eval_len 32000 \
--output_dir /path/to/phonebook_results \
--eval_batch_size 4 \
--num_samples 1000
3. 추론 능력 평가
#!/bin/bash
# reasoning_evaluation.sh
# 수학/과학 추론 평가 (vLLM 백엔드 필요)
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning aime24 /path/to/output_dir
# GSM8K 수학 문제 해결
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning gsm8k /path/to/output_dir
# MATH 데이터셋 평가
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning math /path/to/output_dir
4. 평가 결과 분석
# evaluation_analysis.py
import json
import pandas as pd
import matplotlib.pyplot as plt
def analyze_evaluation_results(results_dir):
"""평가 결과 종합 분석"""
# 표준 벤치마크 결과 로드
with open(f"{results_dir}/standard_eval.json", 'r') as f:
standard_results = json.load(f)
# 롱 컨텍스트 결과 로드
with open(f"{results_dir}/phonebook_results.json", 'r') as f:
phonebook_results = json.load(f)
# 추론 결과 로드
with open(f"{results_dir}/reasoning_results.json", 'r') as f:
reasoning_results = json.load(f)
# 결과 정리
summary = {
"Standard NLP": {
"HellaSwag": standard_results.get("hellaswag", {}).get("acc_norm", 0),
"PIQA": standard_results.get("piqa", {}).get("acc_norm", 0),
"WinoGrande": standard_results.get("winogrande", {}).get("acc", 0),
"ARC-E": standard_results.get("arc_easy", {}).get("acc_norm", 0),
"ARC-C": standard_results.get("arc_challenge", {}).get("acc_norm", 0)
},
"Long Context": {
"Phonebook (16K)": phonebook_results.get("16384", {}).get("accuracy", 0),
"Phonebook (32K)": phonebook_results.get("32768", {}).get("accuracy", 0),
},
"Reasoning": {
"GSM8K": reasoning_results.get("gsm8k", {}).get("accuracy", 0),
"AIME": reasoning_results.get("aime24", {}).get("accuracy", 0),
"MATH": reasoning_results.get("math", {}).get("accuracy", 0)
}
}
# 결과 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for idx, (category, scores) in enumerate(summary.items()):
tasks = list(scores.keys())
values = list(scores.values())
axes[idx].bar(tasks, values)
axes[idx].set_title(f"{category} Performance")
axes[idx].set_ylabel("Accuracy")
axes[idx].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(f"{results_dir}/evaluation_summary.png", dpi=300, bbox_inches='tight')
return summary
# 실행 예시
if __name__ == "__main__":
summary = analyze_evaluation_results("/path/to/eval_results")
print("평가 결과 요약:")
for category, scores in summary.items():
print(f"\n{category}:")
for task, score in scores.items():
print(f" {task}: {score:.3f}")
모니터링 및 로깅
1. Weights & Biases 통합
# wandb_integration.py
import wandb
from datetime import datetime
def setup_wandb_logging(config):
"""W&B 로깅 설정"""
# 프로젝트 초기화
wandb.init(
project="archscale-pretraining",
name=f"experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
config={
"model_architecture": config.train_model,
"depth": config.depth,
"context_length": config.ctx_len,
"max_tokens": config.max_tokens,
"learning_rate": config.base_hps.eta0,
"batch_size": config.base_hps.b0,
"scaling_method": "muP++" if "mup" in config.train_name else "standard"
},
tags=[config.train_model, f"depth_{config.depth}", config.train_name]
)
return wandb
def log_training_metrics(step, metrics):
"""훈련 메트릭 로깅"""
wandb.log({
"step": step,
"train/loss": metrics["train_loss"],
"train/perplexity": metrics["train_ppl"],
"val/loss": metrics["val_loss"],
"val/perplexity": metrics["val_ppl"],
"lr": metrics["learning_rate"],
"gpu_memory": metrics["gpu_memory_gb"],
"tokens_per_second": metrics["tokens_per_sec"]
})
def log_evaluation_results(eval_results):
"""평가 결과 로깅"""
for task, result in eval_results.items():
wandb.log({
f"eval/{task}": result["accuracy"],
f"eval/{task}_samples": result["num_samples"]
})
2. 시스템 모니터링
# system_monitor.py
import psutil
import GPUtil
import torch
import time
from threading import Thread
class SystemMonitor:
def __init__(self, log_interval=60):
self.log_interval = log_interval
self.monitoring = False
def start_monitoring(self):
"""시스템 모니터링 시작"""
self.monitoring = True
monitor_thread = Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def stop_monitoring(self):
"""시스템 모니터링 중지"""
self.monitoring = False
def _monitor_loop(self):
"""모니터링 루프"""
while self.monitoring:
metrics = self._collect_metrics()
self._log_metrics(metrics)
time.sleep(self.log_interval)
def _collect_metrics(self):
"""시스템 메트릭 수집"""
# CPU 및 메모리 사용률
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
# GPU 메트릭
gpus = GPUtil.getGPUs()
gpu_metrics = []
for gpu in gpus:
gpu_metrics.append({
"id": gpu.id,
"utilization": gpu.load * 100,
"memory_used": gpu.memoryUsed,
"memory_total": gpu.memoryTotal,
"temperature": gpu.temperature
})
# PyTorch 메모리 (CUDA)
torch_memory = {}
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
torch_memory[f"cuda_{i}"] = {
"allocated": torch.cuda.memory_allocated(i) / 1024**3, # GB
"reserved": torch.cuda.memory_reserved(i) / 1024**3, # GB
"max_allocated": torch.cuda.max_memory_allocated(i) / 1024**3
}
return {
"timestamp": time.time(),
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_available_gb": memory.available / 1024**3,
"gpu_metrics": gpu_metrics,
"torch_memory": torch_memory
}
def _log_metrics(self, metrics):
"""메트릭 로깅"""
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] System Metrics:")
print(f" CPU: {metrics['cpu_percent']:.1f}%")
print(f" Memory: {metrics['memory_percent']:.1f}%")
for gpu in metrics['gpu_metrics']:
print(f" GPU {gpu['id']}: {gpu['utilization']:.1f}% | "
f"{gpu['memory_used']:.1f}GB/{gpu['memory_total']:.1f}GB | "
f"{gpu['temperature']}°C")
# W&B에도 로깅
if wandb.run is not None:
wandb.log({
"system/cpu_percent": metrics['cpu_percent'],
"system/memory_percent": metrics['memory_percent'],
**{f"system/gpu_{gpu['id']}_util": gpu['utilization']
for gpu in metrics['gpu_metrics']},
**{f"system/gpu_{gpu['id']}_memory": gpu['memory_used']
for gpu in metrics['gpu_metrics']}
})
프로덕션 배포 전략
1. 모델 변환 및 최적화
# model_optimization.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from lit_gpt.model import GPT
from lit_gpt.config import Config
def convert_archscale_to_hf(checkpoint_path, config_name, output_dir):
"""ArchScale 모델을 HuggingFace 포맷으로 변환"""
# ArchScale 모델 로드
config = Config.from_name(config_name)
model = GPT(config)
checkpoint = torch.load(checkpoint_path, map_location="cpu")
model.load_state_dict(checkpoint["model"])
# HuggingFace 포맷으로 변환
# (실제 구현은 아키텍처별로 다름)
hf_model = convert_to_hf_format(model, config)
# 저장
hf_model.save_pretrained(output_dir)
return hf_model
def quantize_model(model_path, output_path, quantization_type="int8"):
"""모델 양자화"""
import torch.quantization as quant
model = AutoModelForCausalLM.from_pretrained(model_path)
if quantization_type == "int8":
model = quant.quantize_dynamic(
model, {torch.nn.Linear}, dtype=torch.qint8
)
elif quantization_type == "int4":
# BitsAndBytes 사용
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=quantization_config,
device_map="auto"
)
model.save_pretrained(output_path)
return model
2. 서빙 최적화
# serving_optimization.py
import vllm
from vllm import LLM, SamplingParams
import ray
class ArchScaleInferenceServer:
def __init__(self, model_path, gpu_memory_utilization=0.9):
"""ArchScale 모델 서빙 서버"""
# vLLM 엔진 초기화
self.llm = LLM(
model=model_path,
gpu_memory_utilization=gpu_memory_utilization,
max_model_len=32768, # 롱 컨텍스트 지원
enable_prefix_caching=True, # 프리픽스 캐싱으로 속도 향상
enforce_eager=False, # CUDA 그래프 최적화
)
# 기본 샘플링 파라미터
self.default_sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=1024,
repetition_penalty=1.1
)
def generate(self, prompts, sampling_params=None):
"""배치 생성"""
if sampling_params is None:
sampling_params = self.default_sampling_params
outputs = self.llm.generate(prompts, sampling_params)
results = []
for output in outputs:
results.append({
"prompt": output.prompt,
"generated_text": output.outputs[0].text,
"finish_reason": output.outputs[0].finish_reason,
"tokens": len(output.outputs[0].token_ids)
})
return results
async def generate_stream(self, prompt, sampling_params=None):
"""스트리밍 생성"""
if sampling_params is None:
sampling_params = self.default_sampling_params
# vLLM 스트리밍 구현
for output in self.llm.generate_stream([prompt], sampling_params):
yield output.outputs[0].text
# Ray Serve를 활용한 확장 가능한 배포
@ray.serve.deployment(num_replicas=2)
class ArchScaleService:
def __init__(self, model_path):
self.inference_server = ArchScaleInferenceServer(model_path)
async def __call__(self, request):
prompts = request["prompts"]
sampling_params = SamplingParams(**request.get("sampling_params", {}))
results = self.inference_server.generate(prompts, sampling_params)
return {"results": results}
3. 성능 벤치마킹
# performance_benchmark.py
import time
import asyncio
import aiohttp
import statistics
from concurrent.futures import ThreadPoolExecutor
class PerformanceBenchmark:
def __init__(self, server_url="http://localhost:8000"):
self.server_url = server_url
async def single_request_latency(self, prompt, num_requests=100):
"""단일 요청 지연시간 측정"""
latencies = []
async with aiohttp.ClientSession() as session:
for _ in range(num_requests):
start_time = time.time()
async with session.post(
f"{self.server_url}/generate",
json={"prompts": [prompt]}
) as response:
await response.json()
latency = time.time() - start_time
latencies.append(latency)
return {
"mean_latency": statistics.mean(latencies),
"median_latency": statistics.median(latencies),
"p95_latency": sorted(latencies)[int(0.95 * len(latencies))],
"p99_latency": sorted(latencies)[int(0.99 * len(latencies))]
}
async def throughput_test(self, prompt, concurrent_requests=50, duration_seconds=60):
"""처리량 측정"""
completed_requests = 0
start_time = time.time()
async def worker():
nonlocal completed_requests
async with aiohttp.ClientSession() as session:
while time.time() - start_time < duration_seconds:
try:
async with session.post(
f"{self.server_url}/generate",
json={"prompts": [prompt]}
) as response:
await response.json()
completed_requests += 1
except Exception as e:
print(f"Request failed: {e}")
# 동시 요청 실행
tasks = [worker() for _ in range(concurrent_requests)]
await asyncio.gather(*tasks)
actual_duration = time.time() - start_time
throughput = completed_requests / actual_duration
return {
"total_requests": completed_requests,
"duration_seconds": actual_duration,
"requests_per_second": throughput
}
def token_generation_speed(self, prompts, output_lengths):
"""토큰 생성 속도 측정"""
total_tokens = 0
total_time = 0
for prompt, expected_length in zip(prompts, output_lengths):
start_time = time.time()
# 실제 요청 (동기식)
import requests
response = requests.post(
f"{self.server_url}/generate",
json={
"prompts": [prompt],
"sampling_params": {"max_tokens": expected_length}
}
)
generation_time = time.time() - start_time
actual_tokens = len(response.json()["results"][0]["generated_text"].split())
total_tokens += actual_tokens
total_time += generation_time
return {
"total_tokens": total_tokens,
"total_time": total_time,
"tokens_per_second": total_tokens / total_time
}
# 벤치마크 실행 예시
async def run_benchmark():
benchmark = PerformanceBenchmark()
test_prompt = "Explain the concept of machine learning in simple terms:"
# 지연시간 테스트
latency_results = await benchmark.single_request_latency(test_prompt)
print("Latency Results:", latency_results)
# 처리량 테스트
throughput_results = await benchmark.throughput_test(test_prompt)
print("Throughput Results:", throughput_results)
# 토큰 생성 속도 테스트
speed_results = benchmark.token_generation_speed([test_prompt], [100])
print("Generation Speed:", speed_results)
if __name__ == "__main__":
asyncio.run(run_benchmark())
실전 운영 가이드
1. 체크포인트 관리
# checkpoint_manager.py
import torch
import shutil
from pathlib import Path
import json
from datetime import datetime
class CheckpointManager:
def __init__(self, base_dir, max_checkpoints=5):
self.base_dir = Path(base_dir)
self.max_checkpoints = max_checkpoints
self.metadata_file = self.base_dir / "checkpoint_metadata.json"
def save_checkpoint(self, model, optimizer, scheduler, step, metrics):
"""체크포인트 저장"""
checkpoint_dir = self.base_dir / f"checkpoint_{step}"
checkpoint_dir.mkdir(parents=True, exist_ok=True)
# 모델 상태 저장
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"scheduler": scheduler.state_dict(),
"step": step,
"metrics": metrics
}, checkpoint_dir / "model.pth")
# 메타데이터 저장
metadata = {
"step": step,
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
"path": str(checkpoint_dir)
}
self._update_metadata(metadata)
self._cleanup_old_checkpoints()
return checkpoint_dir
def load_latest_checkpoint(self):
"""최신 체크포인트 로드"""
if not self.metadata_file.exists():
return None
with open(self.metadata_file, 'r') as f:
all_metadata = json.load(f)
if not all_metadata:
return None
# 최신 체크포인트 찾기
latest = max(all_metadata, key=lambda x: x["step"])
checkpoint_path = Path(latest["path"]) / "model.pth"
if checkpoint_path.exists():
return torch.load(checkpoint_path), latest
return None
def _update_metadata(self, new_metadata):
"""메타데이터 업데이트"""
all_metadata = []
if self.metadata_file.exists():
with open(self.metadata_file, 'r') as f:
all_metadata = json.load(f)
all_metadata.append(new_metadata)
with open(self.metadata_file, 'w') as f:
json.dump(all_metadata, f, indent=2)
def _cleanup_old_checkpoints(self):
"""오래된 체크포인트 정리"""
with open(self.metadata_file, 'r') as f:
all_metadata = json.load(f)
if len(all_metadata) <= self.max_checkpoints:
return
# 오래된 체크포인트 삭제
sorted_metadata = sorted(all_metadata, key=lambda x: x["step"])
to_remove = sorted_metadata[:-self.max_checkpoints]
for metadata in to_remove:
checkpoint_path = Path(metadata["path"])
if checkpoint_path.exists():
shutil.rmtree(checkpoint_path)
# 메타데이터 업데이트
remaining_metadata = sorted_metadata[-self.max_checkpoints:]
with open(self.metadata_file, 'w') as f:
json.dump(remaining_metadata, f, indent=2)
2. 자동화된 실험 관리
# experiment_manager.py
import yaml
import subprocess
import time
from pathlib import Path
import wandb
class ExperimentManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = yaml.safe_load(f)
self.experiments = self.config["experiments"]
self.base_command = self.config["base_command"]
def run_experiment_suite(self):
"""실험 스위트 실행"""
results = {}
for exp_name, exp_config in self.experiments.items():
print(f"Starting experiment: {exp_name}")
# 실험 실행
result = self._run_single_experiment(exp_name, exp_config)
results[exp_name] = result
# 결과 로깅
self._log_experiment_result(exp_name, result)
# 전체 결과 요약
self._generate_summary_report(results)
return results
def _run_single_experiment(self, exp_name, exp_config):
"""단일 실험 실행"""
# 명령어 생성
command = self._build_command(exp_config)
# 실행 시간 측정
start_time = time.time()
try:
# 실험 실행
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=exp_config.get("timeout", 3600*24) # 24시간 기본
)
duration = time.time() - start_time
if result.returncode == 0:
# 성공적인 실험
metrics = self._extract_metrics(result.stdout)
return {
"status": "success",
"duration": duration,
"metrics": metrics,
"stdout": result.stdout[-1000:], # 마지막 1000자만
"stderr": result.stderr
}
else:
# 실패한 실험
return {
"status": "failed",
"duration": duration,
"error": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {
"status": "timeout",
"duration": time.time() - start_time,
"error": "Experiment timed out"
}
except Exception as e:
return {
"status": "error",
"duration": time.time() - start_time,
"error": str(e)
}
def _build_command(self, exp_config):
"""실험 명령어 생성"""
command_parts = [self.base_command]
for key, value in exp_config.items():
if key in ["timeout", "description"]:
continue
if isinstance(value, bool):
if value:
command_parts.append(f"--{key}")
else:
command_parts.append(f"--{key} {value}")
return " ".join(command_parts)
def _extract_metrics(self, stdout):
"""출력에서 메트릭 추출"""
metrics = {}
for line in stdout.split('\n'):
if 'final_loss:' in line:
try:
metrics['final_loss'] = float(line.split('final_loss:')[1].strip())
except:
pass
elif 'best_val_loss:' in line:
try:
metrics['best_val_loss'] = float(line.split('best_val_loss:')[1].strip())
except:
pass
return metrics
# 실험 설정 예시 (experiments.yaml)
experiments_yaml = """
base_command: "torchrun --nnodes=1 --nproc_per_node=8 pretrain.py"
experiments:
small_lr_sweep_1:
train_data_dir: "/path/to/data"
train_model: "transformer"
depth: 8
base_hps.eta0: 1e-4
max_tokens: 1e10
train_name: "lr_sweep_1e4"
timeout: 7200
description: "Learning rate 1e-4 test"
small_lr_sweep_2:
train_data_dir: "/path/to/data"
train_model: "transformer"
depth: 8
base_hps.eta0: 5e-4
max_tokens: 1e10
train_name: "lr_sweep_5e4"
timeout: 7200
description: "Learning rate 5e-4 test"
small_lr_sweep_3:
train_data_dir: "/path/to/data"
train_model: "transformer"
depth: 8
base_hps.eta0: 1e-3
max_tokens: 1e10
train_name: "lr_sweep_1e3"
timeout: 7200
description: "Learning rate 1e-3 test"
"""
문제 해결 가이드
1. 일반적인 오류 및 해결방법
# troubleshooting.py
class ArchScaleTroubleshooter:
@staticmethod
def diagnose_oom_error():
"""OOM 오류 진단 및 해결책"""
print("🔍 Out of Memory (OOM) 오류 진단:")
print("1. GPU 메모리 확인:")
print(" nvidia-smi")
print("\n2. 배치 크기 줄이기:")
print(" --base_hps.b0=4194304 # 기본값의 절반")
print("\n3. Gradient Checkpointing 활성화:")
print(" --gradient_checkpointing=true")
print("\n4. FSDP 메모리 절약 모드:")
print(" --fsdp_save_mem=true")
print("\n5. 컨텍스트 길이 줄이기:")
print(" --ctx_len=4096 # 기본 8192에서 절반으로")
@staticmethod
def diagnose_slow_training():
"""훈련 속도 저하 진단"""
print("🔍 훈련 속도 저하 진단:")
print("1. 데이터 로딩 병목 확인:")
print(" --num_workers=8 # 데이터 로더 워커 수 증가")
print("\n2. Mixed Precision 활성화:")
print(" --precision=bf16")
print("\n3. Compiled 모드 활용:")
print(" --compile=true")
print("\n4. Flash Attention 확인:")
print(" pip install flash-attn")
print("\n5. 데이터셋 사전 토큰화:")
print(" 사전에 토큰화된 데이터 사용")
@staticmethod
def diagnose_distributed_issues():
"""분산 훈련 문제 진단"""
print("🔍 분산 훈련 문제 진단:")
print("1. 네트워크 연결 확인:")
print(" ping $MASTER_ADDR")
print("\n2. 포트 사용 가능 확인:")
print(" netstat -an | grep $MASTER_PORT")
print("\n3. NCCL 환경변수 설정:")
print(" export NCCL_DEBUG=INFO")
print(" export NCCL_IB_DISABLE=1 # InfiniBand 문제 시")
print("\n4. 방화벽 설정 확인:")
print(" 포트 29500-29510 범위 열기")
print("\n5. 노드 간 시간 동기화:")
print(" ntpdate -s time.nist.gov")
2. 성능 최적화 체크리스트
#!/bin/bash
# performance_optimization_checklist.sh
echo "🚀 ArchScale 성능 최적화 체크리스트"
echo "1. 하드웨어 설정 확인:"
echo " - GPU 메모리: 24GB+ 권장"
echo " - GPU 연결: NVLink 또는 PCIe 4.0+"
echo " - 스토리지: NVMe SSD 권장"
echo " - 네트워크: 10Gbps+ (분산 훈련 시)"
echo -e "\n2. 소프트웨어 최적화:"
echo " - PyTorch 2.0+ 사용"
echo " - CUDA 12.1+ 권장"
echo " - Flash Attention 2.0 설치"
echo " - xFormers 설치 고려"
echo -e "\n3. 훈련 설정 최적화:"
echo " - Mixed Precision (bf16) 사용"
echo " - Gradient Accumulation 조정"
echo " - DataLoader num_workers 튜닝"
echo " - Prefetch factor 조정"
echo -e "\n4. 메모리 최적화:"
echo " - FSDP (Fully Sharded Data Parallel) 사용"
echo " - Gradient Checkpointing 활성화"
echo " - Model Sharding 고려"
echo " - CPU Offloading (필요 시)"
echo -e "\n5. 데이터 파이프라인:"
echo " - 사전 토큰화된 데이터 사용"
echo " - 데이터 캐싱 활성화"
echo " - 병렬 데이터 로딩"
echo " - 압축된 데이터 포맷 사용"
결론
Microsoft ArchScale은 대규모 언어모델 사전 훈련을 위한 강력하고 유연한 프레임워크입니다. 이 가이드에서 다룬 내용들을 통해 다음과 같은 성과를 얻을 수 있습니다:
🎯 핵심 성과
- 비용 효율적인 연구: μP++ 스케일링으로 하이퍼파라미터 튜닝 비용 90% 절감
- 확장 가능한 아키텍처: 110M부터 수십억 파라미터까지 일관된 훈련 파이프라인
- 롱 컨텍스트 지원: 128K 토큰까지 효율적인 훈련
- 포괄적인 평가: RULER, Phonebook, 추론 벤치마크를 통한 다면적 성능 평가
🚀 실무 적용 포인트
- 스타트업: 제한된 자원으로 효과적인 모델 연구
- 연구기관: 다양한 아키텍처 실험과 스케일링 법칙 연구
- 기업: 도메인 특화 모델의 효율적인 사전 훈련
- 개발자: 최신 LLM 기술을 활용한 프로덕션 시스템 구축
📈 다음 단계
- 소규모 실험: 8 GPU로 시작하여 μP++ 스케일링 검증
- 아키텍처 탐색: SambaY, Mamba 등 다양한 아키텍처 비교
- 프로덕션 최적화: vLLM, quantization을 통한 서빙 최적화
- 커뮤니티 기여: ArchScale GitHub에 개선사항 기여
ArchScale을 통해 여러분만의 차세대 언어모델을 효율적으로 개발해보세요! 🚀
참고 자료: