⏱️ 예상 읽기 시간: 25분

서론

Microsoft ArchScale은 신경망 아키텍처 연구를 위한 단순하면서도 확장 가능한 사전 훈련 프레임워크입니다. 110M부터 3.3B 파라미터까지의 모델을 효율적으로 훈련할 수 있으며, μP++ 스케일링 법칙을 통해 하이퍼파라미터 튜닝의 비용을 대폭 절감할 수 있습니다.

ArchScale의 핵심 특징

  • 🎯 다양한 아키텍처 지원: Phi4-mini-Flash, SambaY, Transformer++, Mamba 등
  • 📊 μP++ 스케일링: 작은 모델에서 튜닝한 하이퍼파라미터를 대규모 모델에 자동 전이
  • 🚀 효율적인 훈련: Flash Attention, RoPE, fused kernel 등 최적화 기술
  • 📏 롱 컨텍스트: 최대 128K 시퀀스 길이까지 가변 길이 훈련
  • 🔍 포괄적 평가: RULER, Phonebook, 추론 벤치마크 등

환경 설정 및 설치

1. Docker 환경 구성

ArchScale은 Docker를 통한 환경 설정을 권장합니다:

# ArchScale 저장소 클론
git clone https://github.com/microsoft/ArchScale.git
cd ArchScale

# Docker 이미지 빌드
docker build -t archscale:latest .

# 컨테이너 실행 (GPU 지원)
docker run --gpus all -it --rm \
    -v $(pwd):/workspace \
    -v /path/to/data:/data \
    archscale:latest bash

2. 직접 설치

Docker를 사용하지 않는 경우:

# 의존성 설치
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
pip install lightning datasets transformers wandb

# ArchScale 클론 및 설치
git clone https://github.com/microsoft/ArchScale.git
cd ArchScale
pip install -e .

3. 데이터 준비

SlimPajama 데이터셋 토큰화

# Samba 코드베이스 참조하여 데이터 토큰화
# 예시: SlimPajama 전처리
python scripts/prepare_data.py \
    --input_dir /path/to/slimpajama \
    --output_dir /path/to/tokenized_data \
    --tokenizer microsoft/Phi-4-mini-flash-reasoning \
    --context_length 8192

핵심 개념: μP++ 스케일링 법칙

μP++란?

μP++ (Maximal Update Parameterization)는 모델 크기가 변해도 최적의 하이퍼파라미터를 자동으로 스케일링하는 기법입니다.

# BaseHyperparameters 설정 예시
from lit_gpt.config import BaseHyperparameters

# d16 (depth=16) 기준 하이퍼파라미터
base_hps = BaseHyperparameters(
    eta0=5e-4,          # 학습률
    b0=8388608,         # 배치 크기 (토큰 수)
    warmup_tokens0=25_165_824_000,  # 워밍업 토큰 수
    # 기타 최적화 관련 파라미터들
)

# 실제 d8 또는 d24 모델 훈련 시 자동 스케일링됨

스케일링 법칙의 장점

  1. 비용 절감: 작은 모델로 하이퍼파라미터 튜닝 후 대규모 모델에 적용
  2. 일관성: 모델 크기와 관계없이 안정적인 훈련
  3. 효율성: 실험 회수 대폭 감소

실전 훈련 가이드

1. Phi4-mini-Flash 모델 사전 훈련

5T 토큰의 고품질 데이터로 Phi4-mini-Flash 모델을 훈련:

#!/bin/bash
# phi4_pretrain.sh

export LIGHTNING_ARTIFACTS_DIR='/path/to/output_dir'
export MASTER_ADDR='master_node_ip'
export MASTER_PORT='29500'

# 1K GPU (128 노드 × 8 GPU)로 분산 훈련
torchrun --nnodes=128 --nproc_per_node=8 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
    pretrain.py \
    --train_data_dir /path/to/phi4/data \
    --base_hps.eta0=5e-4 \
    --base_hps.b0=8388608 \
    --base_hps.warmup_tokens0=25_165_824_000 \
    --ctx_len 8192 \
    --max_tokens 5e12 \
    --resume="auto" \
    --train_model phi4miniflash \
    --depth 32 \
    --train_name scaling

2. SambaY 아키텍처 훈련 (권장)

더 깔끔한 아키텍처인 SambaY 모델 훈련:

#!/bin/bash
# sambay_pretrain.sh

torchrun --nnodes=128 --nproc_per_node=8 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
    pretrain.py \
    --train_data_dir /path/to/data \
    --train_model sambayda \
    --depth 24 \
    --vocab_size 200064 \
    --train_name scaling_mup_tie \
    --ctx_len 8192 \
    --max_tokens 1e12

3. FLOPs 스케일링 실험

110M부터 3.3B 파라미터까지 다양한 크기의 모델 훈련:

#!/bin/bash
# flops_scaling.sh

export MASTER_ADDR='localhost'
export MASTER_PORT='29500'

# 8 GPU로 다양한 depth 실험
for depth in 8 12 16 20 24; do
    echo "Training model with depth=${depth}"
    
    torchrun --nnodes=1 --nproc_per_node=8 \
        --rdzv_backend=c10d \
        --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
        pretrain.py \
        --train_data_dir /path/to/slim_pajama/data \
        --val_data_dir /path/to/slim_pajama/data \
        --train_model sambay \
        --depth ${depth} \
        --train_name scaling_mup \
        --max_tokens 1e11
done

# 스케일링 곡선 생성
python plot_flops_scaling.py \
    --log_dir /path/to/output_dir \
    --output_file flops_scaling_results.png

4. 데이터 스케일링 실험

1B 파라미터 모델로 100B~600B 토큰 스케일링:

#!/bin/bash
# data_scaling.sh

# 64 GPU (8 노드 × 8 GPU)로 데이터 스케일링
for tokens in 1e11 2e11 3e11 4e11 5e11 6e11; do
    echo "Training with ${tokens} tokens"
    
    torchrun --nnodes=8 --nproc_per_node=8 \
        --rdzv_backend=c10d \
        --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
        pretrain.py \
        --train_data_dir /path/to/slim_pajama/data \
        --val_data_dir /path/to/slim_pajama/data \
        --train_model transformer \
        --depth 16 \
        --max_tokens ${tokens} \
        --train_name scaling_mup_tie
done

# 데이터 스케일링 곡선 분석
python plot_data_scaling.py \
    --log_dir /path/to/output_dir \
    --output_file data_scaling_results.png

하이퍼파라미터 튜닝 전략

1. μP++를 활용한 효율적 튜닝

#!/bin/bash
# hyperparameter_sweep.sh

# d16, 1B 모델 기준으로 하이퍼파라미터 튜닝
# 실제로는 d8 모델에서 빠르게 실험
for lr in 4e-4 1e-4 1e-3; do
    for batch_size in 4194304 8388608 16777216; do
        echo "Testing lr=${lr}, batch_size=${batch_size}"
        
        torchrun --nnodes=1 --nproc_per_node=8 \
            --rdzv_backend=c10d \
            --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
            pretrain.py \
            --train_data_dir /path/to/slim_pajama/data \
            --val_data_dir /path/to/slim_pajama/data \
            --train_model transformer \
            --depth 8 \
            --base_hps.eta0=${lr} \
            --base_hps.b0=${batch_size} \
            --train_name "hp_sweep_lr${lr}_bs${batch_size}" \
            --max_tokens 1e10
    done
done

2. 최적 하이퍼파라미터 분석

# analyze_hyperparameters.py
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

def analyze_sweep_results(log_dir):
    """하이퍼파라미터 스위프 결과 분석"""
    results = []
    
    for log_file in Path(log_dir).glob("*/metrics.csv"):
        # 각 실험의 최종 손실값 추출
        df = pd.read_csv(log_file)
        final_loss = df['val_loss'].min()
        
        # 실험명에서 하이퍼파라미터 추출
        exp_name = log_file.parent.name
        lr = float(exp_name.split('lr')[1].split('_')[0])
        bs = int(exp_name.split('bs')[1])
        
        results.append({
            'learning_rate': lr,
            'batch_size': bs,
            'final_loss': final_loss
        })
    
    results_df = pd.DataFrame(results)
    
    # 최적 하이퍼파라미터 찾기
    best_idx = results_df['final_loss'].idxmin()
    best_config = results_df.iloc[best_idx]
    
    print(f"최적 설정:")
    print(f"학습률: {best_config['learning_rate']}")
    print(f"배치 크기: {best_config['batch_size']}")
    print(f"최종 손실: {best_config['final_loss']:.4f}")
    
    return results_df, best_config

if __name__ == "__main__":
    results_df, best_config = analyze_sweep_results("/path/to/sweep_logs")

롱 컨텍스트 훈련

1. ProLong-64K 데이터로 32K 컨텍스트 훈련

#!/bin/bash
# long_context_training.sh

# ProLong-64K 데이터 전처리 (사전 토큰화)
python scripts/preprocess_prolong.py \
    --input_dir /path/to/prolong_raw \
    --output_dir /path/to/prolong_tokenized \
    --max_length 65536 \
    --shuffle

# 32K 시퀀스 길이로 훈련
torchrun --nnodes=1 --nproc_per_node=8 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
    pretrain.py \
    --train_data_dir /path/to/prolong_tokenized \
    --val_data_dir /path/to/prolong_tokenized \
    --train_model transformer \
    --depth 16 \
    --ctx_len 32768 \
    --max_tokens 4e10 \
    --train_name scaling_mup_rbase_varlen

2. 128K 컨텍스트 훈련 (최대 규모)

#!/bin/bash
# ultra_long_context.sh

# 메모리 절약 모드로 128K 컨텍스트 훈련
torchrun --nnodes=4 --nproc_per_node=8 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
    pretrain.py \
    --train_data_dir /path/to/prolong_tokenized \
    --train_model transformer \
    --depth 20 \
    --ctx_len 131072 \
    --fsdp_save_mem=true \
    --train_name ultra_long_context \
    --max_tokens 2e10

3. 가변 길이 훈련 설정

# variable_length_config.py
from lit_gpt.config import Config

def setup_variable_length_training():
    """가변 길이 훈련을 위한 설정"""
    config = Config(
        # 기본 모델 설정
        n_layer=16,
        n_head=16,
        n_embd=2048,
        
        # 가변 길이 설정
        variable_length=True,
        pack_sequences=True,
        eos_token_id=2,  # EOS 토큰으로 문서 분리
        
        # RoPE 설정 (긴 컨텍스트용)
        rope_base=10000,  # 기본값
        rope_scaling=None,  # 또는 {"type": "linear", "factor": 2.0}
        
        # 메모리 최적화
        use_gradient_checkpointing=True,
        attention_dropout=0.0,
        residual_dropout=0.1
    )
    
    return config

Mamba 아키텍처 훈련

1. Mamba-1 설치

# 가변 길이 Mamba 설치
git clone https://github.com/zigzagcai/varlen_mamba.git --branch feat/add-cu_seqlens
cd varlen_mamba
pip install --no-build-isolation -e .

2. Mamba 모델 훈련

#!/bin/bash
# mamba_training.sh

torchrun --nnodes=2 --nproc_per_node=8 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=${MASTER_ADDR}:${MASTER_PORT} \
    pretrain.py \
    --train_data_dir /path/to/data \
    --train_model mamba \
    --depth 16 \
    --ctx_len 16384 \
    --train_name mamba_varlen \
    --max_tokens 1e11

포괄적 평가 시스템

1. 표준 NLP 벤치마크

#!/bin/bash
# standard_evaluation.sh

python eval.py \
    --model ArchScale \
    --model_args pretrained=/path/to/checkpoint.pth,config="sambay_d16" \
    --tasks wikitext,lambada_openai,arc_easy,arc_challenge,winogrande,hellaswag,piqa,social_iqa \
    --device cuda \
    --batch_size 16 \
    --trust_remote_code \
    --output_path /path/to/eval_results.json

2. 롱 컨텍스트 평가

RULER 벤치마크

#!/bin/bash
# ruler_evaluation.sh

# Needle-in-a-Haystack 테스트
python eval.py \
    --model ArchScale \
    --model_args pretrained=/path/to/checkpoint.pth,config="sambay_d16",max_length=32768,tokenizer=Orkhan/llama-2-7b-absa \
    --metadata='{"max_seq_lengths":[32768]}' \
    --tasks niah_single_1,niah_single_2,niah_single_3 \
    --device cuda \
    --batch_size 4 \
    --trust_remote_code

Phonebook 평가

#!/bin/bash
# phonebook_evaluation.sh

python eval_phonebook.py \
    --checkpoint_path /path/to/checkpoint.pth \
    --config "sambay_d16" \
    --min_eval_len 1850 \
    --max_eval_len 32000 \
    --output_dir /path/to/phonebook_results \
    --eval_batch_size 4 \
    --num_samples 1000

3. 추론 능력 평가

#!/bin/bash
# reasoning_evaluation.sh

# 수학/과학 추론 평가 (vLLM 백엔드 필요)
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning aime24 /path/to/output_dir

# GSM8K 수학 문제 해결
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning gsm8k /path/to/output_dir

# MATH 데이터셋 평가
./eval_reason.sh microsoft/Phi-4-mini-flash-reasoning math /path/to/output_dir

4. 평가 결과 분석

# evaluation_analysis.py
import json
import pandas as pd
import matplotlib.pyplot as plt

def analyze_evaluation_results(results_dir):
    """평가 결과 종합 분석"""
    
    # 표준 벤치마크 결과 로드
    with open(f"{results_dir}/standard_eval.json", 'r') as f:
        standard_results = json.load(f)
    
    # 롱 컨텍스트 결과 로드
    with open(f"{results_dir}/phonebook_results.json", 'r') as f:
        phonebook_results = json.load(f)
    
    # 추론 결과 로드
    with open(f"{results_dir}/reasoning_results.json", 'r') as f:
        reasoning_results = json.load(f)
    
    # 결과 정리
    summary = {
        "Standard NLP": {
            "HellaSwag": standard_results.get("hellaswag", {}).get("acc_norm", 0),
            "PIQA": standard_results.get("piqa", {}).get("acc_norm", 0),
            "WinoGrande": standard_results.get("winogrande", {}).get("acc", 0),
            "ARC-E": standard_results.get("arc_easy", {}).get("acc_norm", 0),
            "ARC-C": standard_results.get("arc_challenge", {}).get("acc_norm", 0)
        },
        "Long Context": {
            "Phonebook (16K)": phonebook_results.get("16384", {}).get("accuracy", 0),
            "Phonebook (32K)": phonebook_results.get("32768", {}).get("accuracy", 0),
        },
        "Reasoning": {
            "GSM8K": reasoning_results.get("gsm8k", {}).get("accuracy", 0),
            "AIME": reasoning_results.get("aime24", {}).get("accuracy", 0),
            "MATH": reasoning_results.get("math", {}).get("accuracy", 0)
        }
    }
    
    # 결과 시각화
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    for idx, (category, scores) in enumerate(summary.items()):
        tasks = list(scores.keys())
        values = list(scores.values())
        
        axes[idx].bar(tasks, values)
        axes[idx].set_title(f"{category} Performance")
        axes[idx].set_ylabel("Accuracy")
        axes[idx].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.savefig(f"{results_dir}/evaluation_summary.png", dpi=300, bbox_inches='tight')
    
    return summary

# 실행 예시
if __name__ == "__main__":
    summary = analyze_evaluation_results("/path/to/eval_results")
    print("평가 결과 요약:")
    for category, scores in summary.items():
        print(f"\n{category}:")
        for task, score in scores.items():
            print(f"  {task}: {score:.3f}")

모니터링 및 로깅

1. Weights & Biases 통합

# wandb_integration.py
import wandb
from datetime import datetime

def setup_wandb_logging(config):
    """W&B 로깅 설정"""
    
    # 프로젝트 초기화
    wandb.init(
        project="archscale-pretraining",
        name=f"experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
        config={
            "model_architecture": config.train_model,
            "depth": config.depth,
            "context_length": config.ctx_len,
            "max_tokens": config.max_tokens,
            "learning_rate": config.base_hps.eta0,
            "batch_size": config.base_hps.b0,
            "scaling_method": "muP++" if "mup" in config.train_name else "standard"
        },
        tags=[config.train_model, f"depth_{config.depth}", config.train_name]
    )
    
    return wandb

def log_training_metrics(step, metrics):
    """훈련 메트릭 로깅"""
    wandb.log({
        "step": step,
        "train/loss": metrics["train_loss"],
        "train/perplexity": metrics["train_ppl"],
        "val/loss": metrics["val_loss"],
        "val/perplexity": metrics["val_ppl"],
        "lr": metrics["learning_rate"],
        "gpu_memory": metrics["gpu_memory_gb"],
        "tokens_per_second": metrics["tokens_per_sec"]
    })

def log_evaluation_results(eval_results):
    """평가 결과 로깅"""
    for task, result in eval_results.items():
        wandb.log({
            f"eval/{task}": result["accuracy"],
            f"eval/{task}_samples": result["num_samples"]
        })

2. 시스템 모니터링

# system_monitor.py
import psutil
import GPUtil
import torch
import time
from threading import Thread

class SystemMonitor:
    def __init__(self, log_interval=60):
        self.log_interval = log_interval
        self.monitoring = False
        
    def start_monitoring(self):
        """시스템 모니터링 시작"""
        self.monitoring = True
        monitor_thread = Thread(target=self._monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()
        
    def stop_monitoring(self):
        """시스템 모니터링 중지"""
        self.monitoring = False
        
    def _monitor_loop(self):
        """모니터링 루프"""
        while self.monitoring:
            metrics = self._collect_metrics()
            self._log_metrics(metrics)
            time.sleep(self.log_interval)
    
    def _collect_metrics(self):
        """시스템 메트릭 수집"""
        # CPU 및 메모리 사용률
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        
        # GPU 메트릭
        gpus = GPUtil.getGPUs()
        gpu_metrics = []
        for gpu in gpus:
            gpu_metrics.append({
                "id": gpu.id,
                "utilization": gpu.load * 100,
                "memory_used": gpu.memoryUsed,
                "memory_total": gpu.memoryTotal,
                "temperature": gpu.temperature
            })
        
        # PyTorch 메모리 (CUDA)
        torch_memory = {}
        if torch.cuda.is_available():
            for i in range(torch.cuda.device_count()):
                torch_memory[f"cuda_{i}"] = {
                    "allocated": torch.cuda.memory_allocated(i) / 1024**3,  # GB
                    "reserved": torch.cuda.memory_reserved(i) / 1024**3,    # GB
                    "max_allocated": torch.cuda.max_memory_allocated(i) / 1024**3
                }
        
        return {
            "timestamp": time.time(),
            "cpu_percent": cpu_percent,
            "memory_percent": memory.percent,
            "memory_available_gb": memory.available / 1024**3,
            "gpu_metrics": gpu_metrics,
            "torch_memory": torch_memory
        }
    
    def _log_metrics(self, metrics):
        """메트릭 로깅"""
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] System Metrics:")
        print(f"  CPU: {metrics['cpu_percent']:.1f}%")
        print(f"  Memory: {metrics['memory_percent']:.1f}%")
        
        for gpu in metrics['gpu_metrics']:
            print(f"  GPU {gpu['id']}: {gpu['utilization']:.1f}% | "
                  f"{gpu['memory_used']:.1f}GB/{gpu['memory_total']:.1f}GB | "
                  f"{gpu['temperature']}°C")
        
        # W&B에도 로깅
        if wandb.run is not None:
            wandb.log({
                "system/cpu_percent": metrics['cpu_percent'],
                "system/memory_percent": metrics['memory_percent'],
                **{f"system/gpu_{gpu['id']}_util": gpu['utilization'] 
                   for gpu in metrics['gpu_metrics']},
                **{f"system/gpu_{gpu['id']}_memory": gpu['memory_used'] 
                   for gpu in metrics['gpu_metrics']}
            })

프로덕션 배포 전략

1. 모델 변환 및 최적화

# model_optimization.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from lit_gpt.model import GPT
from lit_gpt.config import Config

def convert_archscale_to_hf(checkpoint_path, config_name, output_dir):
    """ArchScale 모델을 HuggingFace 포맷으로 변환"""
    
    # ArchScale 모델 로드
    config = Config.from_name(config_name)
    model = GPT(config)
    
    checkpoint = torch.load(checkpoint_path, map_location="cpu")
    model.load_state_dict(checkpoint["model"])
    
    # HuggingFace 포맷으로 변환
    # (실제 구현은 아키텍처별로 다름)
    hf_model = convert_to_hf_format(model, config)
    
    # 저장
    hf_model.save_pretrained(output_dir)
    
    return hf_model

def quantize_model(model_path, output_path, quantization_type="int8"):
    """모델 양자화"""
    import torch.quantization as quant
    
    model = AutoModelForCausalLM.from_pretrained(model_path)
    
    if quantization_type == "int8":
        model = quant.quantize_dynamic(
            model, {torch.nn.Linear}, dtype=torch.qint8
        )
    elif quantization_type == "int4":
        # BitsAndBytes 사용
        from transformers import BitsAndBytesConfig
        
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.float16
        )
        
        model = AutoModelForCausalLM.from_pretrained(
            model_path,
            quantization_config=quantization_config,
            device_map="auto"
        )
    
    model.save_pretrained(output_path)
    return model

2. 서빙 최적화

# serving_optimization.py
import vllm
from vllm import LLM, SamplingParams
import ray

class ArchScaleInferenceServer:
    def __init__(self, model_path, gpu_memory_utilization=0.9):
        """ArchScale 모델 서빙 서버"""
        
        # vLLM 엔진 초기화
        self.llm = LLM(
            model=model_path,
            gpu_memory_utilization=gpu_memory_utilization,
            max_model_len=32768,  # 롱 컨텍스트 지원
            enable_prefix_caching=True,  # 프리픽스 캐싱으로 속도 향상
            enforce_eager=False,  # CUDA 그래프 최적화
        )
        
        # 기본 샘플링 파라미터
        self.default_sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.9,
            max_tokens=1024,
            repetition_penalty=1.1
        )
    
    def generate(self, prompts, sampling_params=None):
        """배치 생성"""
        if sampling_params is None:
            sampling_params = self.default_sampling_params
            
        outputs = self.llm.generate(prompts, sampling_params)
        
        results = []
        for output in outputs:
            results.append({
                "prompt": output.prompt,
                "generated_text": output.outputs[0].text,
                "finish_reason": output.outputs[0].finish_reason,
                "tokens": len(output.outputs[0].token_ids)
            })
        
        return results
    
    async def generate_stream(self, prompt, sampling_params=None):
        """스트리밍 생성"""
        if sampling_params is None:
            sampling_params = self.default_sampling_params
            
        # vLLM 스트리밍 구현
        for output in self.llm.generate_stream([prompt], sampling_params):
            yield output.outputs[0].text

# Ray Serve를 활용한 확장 가능한 배포
@ray.serve.deployment(num_replicas=2)
class ArchScaleService:
    def __init__(self, model_path):
        self.inference_server = ArchScaleInferenceServer(model_path)
    
    async def __call__(self, request):
        prompts = request["prompts"]
        sampling_params = SamplingParams(**request.get("sampling_params", {}))
        
        results = self.inference_server.generate(prompts, sampling_params)
        return {"results": results}

3. 성능 벤치마킹

# performance_benchmark.py
import time
import asyncio
import aiohttp
import statistics
from concurrent.futures import ThreadPoolExecutor

class PerformanceBenchmark:
    def __init__(self, server_url="http://localhost:8000"):
        self.server_url = server_url
        
    async def single_request_latency(self, prompt, num_requests=100):
        """단일 요청 지연시간 측정"""
        latencies = []
        
        async with aiohttp.ClientSession() as session:
            for _ in range(num_requests):
                start_time = time.time()
                
                async with session.post(
                    f"{self.server_url}/generate",
                    json={"prompts": [prompt]}
                ) as response:
                    await response.json()
                
                latency = time.time() - start_time
                latencies.append(latency)
        
        return {
            "mean_latency": statistics.mean(latencies),
            "median_latency": statistics.median(latencies),
            "p95_latency": sorted(latencies)[int(0.95 * len(latencies))],
            "p99_latency": sorted(latencies)[int(0.99 * len(latencies))]
        }
    
    async def throughput_test(self, prompt, concurrent_requests=50, duration_seconds=60):
        """처리량 측정"""
        completed_requests = 0
        start_time = time.time()
        
        async def worker():
            nonlocal completed_requests
            async with aiohttp.ClientSession() as session:
                while time.time() - start_time < duration_seconds:
                    try:
                        async with session.post(
                            f"{self.server_url}/generate",
                            json={"prompts": [prompt]}
                        ) as response:
                            await response.json()
                            completed_requests += 1
                    except Exception as e:
                        print(f"Request failed: {e}")
        
        # 동시 요청 실행
        tasks = [worker() for _ in range(concurrent_requests)]
        await asyncio.gather(*tasks)
        
        actual_duration = time.time() - start_time
        throughput = completed_requests / actual_duration
        
        return {
            "total_requests": completed_requests,
            "duration_seconds": actual_duration,
            "requests_per_second": throughput
        }
    
    def token_generation_speed(self, prompts, output_lengths):
        """토큰 생성 속도 측정"""
        total_tokens = 0
        total_time = 0
        
        for prompt, expected_length in zip(prompts, output_lengths):
            start_time = time.time()
            
            # 실제 요청 (동기식)
            import requests
            response = requests.post(
                f"{self.server_url}/generate",
                json={
                    "prompts": [prompt],
                    "sampling_params": {"max_tokens": expected_length}
                }
            )
            
            generation_time = time.time() - start_time
            actual_tokens = len(response.json()["results"][0]["generated_text"].split())
            
            total_tokens += actual_tokens
            total_time += generation_time
        
        return {
            "total_tokens": total_tokens,
            "total_time": total_time,
            "tokens_per_second": total_tokens / total_time
        }

# 벤치마크 실행 예시
async def run_benchmark():
    benchmark = PerformanceBenchmark()
    
    test_prompt = "Explain the concept of machine learning in simple terms:"
    
    # 지연시간 테스트
    latency_results = await benchmark.single_request_latency(test_prompt)
    print("Latency Results:", latency_results)
    
    # 처리량 테스트
    throughput_results = await benchmark.throughput_test(test_prompt)
    print("Throughput Results:", throughput_results)
    
    # 토큰 생성 속도 테스트
    speed_results = benchmark.token_generation_speed([test_prompt], [100])
    print("Generation Speed:", speed_results)

if __name__ == "__main__":
    asyncio.run(run_benchmark())

실전 운영 가이드

1. 체크포인트 관리

# checkpoint_manager.py
import torch
import shutil
from pathlib import Path
import json
from datetime import datetime

class CheckpointManager:
    def __init__(self, base_dir, max_checkpoints=5):
        self.base_dir = Path(base_dir)
        self.max_checkpoints = max_checkpoints
        self.metadata_file = self.base_dir / "checkpoint_metadata.json"
        
    def save_checkpoint(self, model, optimizer, scheduler, step, metrics):
        """체크포인트 저장"""
        checkpoint_dir = self.base_dir / f"checkpoint_{step}"
        checkpoint_dir.mkdir(parents=True, exist_ok=True)
        
        # 모델 상태 저장
        torch.save({
            "model": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "scheduler": scheduler.state_dict(),
            "step": step,
            "metrics": metrics
        }, checkpoint_dir / "model.pth")
        
        # 메타데이터 저장
        metadata = {
            "step": step,
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
            "path": str(checkpoint_dir)
        }
        
        self._update_metadata(metadata)
        self._cleanup_old_checkpoints()
        
        return checkpoint_dir
    
    def load_latest_checkpoint(self):
        """최신 체크포인트 로드"""
        if not self.metadata_file.exists():
            return None
            
        with open(self.metadata_file, 'r') as f:
            all_metadata = json.load(f)
        
        if not all_metadata:
            return None
            
        # 최신 체크포인트 찾기
        latest = max(all_metadata, key=lambda x: x["step"])
        checkpoint_path = Path(latest["path"]) / "model.pth"
        
        if checkpoint_path.exists():
            return torch.load(checkpoint_path), latest
        
        return None
    
    def _update_metadata(self, new_metadata):
        """메타데이터 업데이트"""
        all_metadata = []
        
        if self.metadata_file.exists():
            with open(self.metadata_file, 'r') as f:
                all_metadata = json.load(f)
        
        all_metadata.append(new_metadata)
        
        with open(self.metadata_file, 'w') as f:
            json.dump(all_metadata, f, indent=2)
    
    def _cleanup_old_checkpoints(self):
        """오래된 체크포인트 정리"""
        with open(self.metadata_file, 'r') as f:
            all_metadata = json.load(f)
        
        if len(all_metadata) <= self.max_checkpoints:
            return
        
        # 오래된 체크포인트 삭제
        sorted_metadata = sorted(all_metadata, key=lambda x: x["step"])
        to_remove = sorted_metadata[:-self.max_checkpoints]
        
        for metadata in to_remove:
            checkpoint_path = Path(metadata["path"])
            if checkpoint_path.exists():
                shutil.rmtree(checkpoint_path)
        
        # 메타데이터 업데이트
        remaining_metadata = sorted_metadata[-self.max_checkpoints:]
        with open(self.metadata_file, 'w') as f:
            json.dump(remaining_metadata, f, indent=2)

2. 자동화된 실험 관리

# experiment_manager.py
import yaml
import subprocess
import time
from pathlib import Path
import wandb

class ExperimentManager:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = yaml.safe_load(f)
        
        self.experiments = self.config["experiments"]
        self.base_command = self.config["base_command"]
        
    def run_experiment_suite(self):
        """실험 스위트 실행"""
        results = {}
        
        for exp_name, exp_config in self.experiments.items():
            print(f"Starting experiment: {exp_name}")
            
            # 실험 실행
            result = self._run_single_experiment(exp_name, exp_config)
            results[exp_name] = result
            
            # 결과 로깅
            self._log_experiment_result(exp_name, result)
        
        # 전체 결과 요약
        self._generate_summary_report(results)
        
        return results
    
    def _run_single_experiment(self, exp_name, exp_config):
        """단일 실험 실행"""
        
        # 명령어 생성
        command = self._build_command(exp_config)
        
        # 실행 시간 측정
        start_time = time.time()
        
        try:
            # 실험 실행
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=exp_config.get("timeout", 3600*24)  # 24시간 기본
            )
            
            duration = time.time() - start_time
            
            if result.returncode == 0:
                # 성공적인 실험
                metrics = self._extract_metrics(result.stdout)
                return {
                    "status": "success",
                    "duration": duration,
                    "metrics": metrics,
                    "stdout": result.stdout[-1000:],  # 마지막 1000자만
                    "stderr": result.stderr
                }
            else:
                # 실패한 실험
                return {
                    "status": "failed",
                    "duration": duration,
                    "error": result.stderr,
                    "returncode": result.returncode
                }
                
        except subprocess.TimeoutExpired:
            return {
                "status": "timeout",
                "duration": time.time() - start_time,
                "error": "Experiment timed out"
            }
        except Exception as e:
            return {
                "status": "error",
                "duration": time.time() - start_time,
                "error": str(e)
            }
    
    def _build_command(self, exp_config):
        """실험 명령어 생성"""
        command_parts = [self.base_command]
        
        for key, value in exp_config.items():
            if key in ["timeout", "description"]:
                continue
                
            if isinstance(value, bool):
                if value:
                    command_parts.append(f"--{key}")
            else:
                command_parts.append(f"--{key} {value}")
        
        return " ".join(command_parts)
    
    def _extract_metrics(self, stdout):
        """출력에서 메트릭 추출"""
        metrics = {}
        
        for line in stdout.split('\n'):
            if 'final_loss:' in line:
                try:
                    metrics['final_loss'] = float(line.split('final_loss:')[1].strip())
                except:
                    pass
            elif 'best_val_loss:' in line:
                try:
                    metrics['best_val_loss'] = float(line.split('best_val_loss:')[1].strip())
                except:
                    pass
        
        return metrics

# 실험 설정 예시 (experiments.yaml)
experiments_yaml = """
base_command: "torchrun --nnodes=1 --nproc_per_node=8 pretrain.py"

experiments:
  small_lr_sweep_1:
    train_data_dir: "/path/to/data"
    train_model: "transformer"
    depth: 8
    base_hps.eta0: 1e-4
    max_tokens: 1e10
    train_name: "lr_sweep_1e4"
    timeout: 7200
    description: "Learning rate 1e-4 test"
    
  small_lr_sweep_2:
    train_data_dir: "/path/to/data"
    train_model: "transformer"
    depth: 8
    base_hps.eta0: 5e-4
    max_tokens: 1e10
    train_name: "lr_sweep_5e4"
    timeout: 7200
    description: "Learning rate 5e-4 test"
    
  small_lr_sweep_3:
    train_data_dir: "/path/to/data"
    train_model: "transformer"
    depth: 8
    base_hps.eta0: 1e-3
    max_tokens: 1e10
    train_name: "lr_sweep_1e3"
    timeout: 7200
    description: "Learning rate 1e-3 test"
"""

문제 해결 가이드

1. 일반적인 오류 및 해결방법

# troubleshooting.py

class ArchScaleTroubleshooter:
    @staticmethod
    def diagnose_oom_error():
        """OOM 오류 진단 및 해결책"""
        print("🔍 Out of Memory (OOM) 오류 진단:")
        print("1. GPU 메모리 확인:")
        print("   nvidia-smi")
        print("\n2. 배치 크기 줄이기:")
        print("   --base_hps.b0=4194304  # 기본값의 절반")
        print("\n3. Gradient Checkpointing 활성화:")
        print("   --gradient_checkpointing=true")
        print("\n4. FSDP 메모리 절약 모드:")
        print("   --fsdp_save_mem=true")
        print("\n5. 컨텍스트 길이 줄이기:")
        print("   --ctx_len=4096  # 기본 8192에서 절반으로")
    
    @staticmethod
    def diagnose_slow_training():
        """훈련 속도 저하 진단"""
        print("🔍 훈련 속도 저하 진단:")
        print("1. 데이터 로딩 병목 확인:")
        print("   --num_workers=8  # 데이터 로더 워커 수 증가")
        print("\n2. Mixed Precision 활성화:")
        print("   --precision=bf16")
        print("\n3. Compiled 모드 활용:")
        print("   --compile=true")
        print("\n4. Flash Attention 확인:")
        print("   pip install flash-attn")
        print("\n5. 데이터셋 사전 토큰화:")
        print("   사전에 토큰화된 데이터 사용")
    
    @staticmethod
    def diagnose_distributed_issues():
        """분산 훈련 문제 진단"""
        print("🔍 분산 훈련 문제 진단:")
        print("1. 네트워크 연결 확인:")
        print("   ping $MASTER_ADDR")
        print("\n2. 포트 사용 가능 확인:")
        print("   netstat -an | grep $MASTER_PORT")
        print("\n3. NCCL 환경변수 설정:")
        print("   export NCCL_DEBUG=INFO")
        print("   export NCCL_IB_DISABLE=1  # InfiniBand 문제 시")
        print("\n4. 방화벽 설정 확인:")
        print("   포트 29500-29510 범위 열기")
        print("\n5. 노드 간 시간 동기화:")
        print("   ntpdate -s time.nist.gov")

2. 성능 최적화 체크리스트

#!/bin/bash
# performance_optimization_checklist.sh

echo "🚀 ArchScale 성능 최적화 체크리스트"

echo "1. 하드웨어 설정 확인:"
echo "   - GPU 메모리: 24GB+ 권장"
echo "   - GPU 연결: NVLink 또는 PCIe 4.0+"
echo "   - 스토리지: NVMe SSD 권장"
echo "   - 네트워크: 10Gbps+ (분산 훈련 시)"

echo -e "\n2. 소프트웨어 최적화:"
echo "   - PyTorch 2.0+ 사용"
echo "   - CUDA 12.1+ 권장"
echo "   - Flash Attention 2.0 설치"
echo "   - xFormers 설치 고려"

echo -e "\n3. 훈련 설정 최적화:"
echo "   - Mixed Precision (bf16) 사용"
echo "   - Gradient Accumulation 조정"
echo "   - DataLoader num_workers 튜닝"
echo "   - Prefetch factor 조정"

echo -e "\n4. 메모리 최적화:"
echo "   - FSDP (Fully Sharded Data Parallel) 사용"
echo "   - Gradient Checkpointing 활성화"
echo "   - Model Sharding 고려"
echo "   - CPU Offloading (필요 시)"

echo -e "\n5. 데이터 파이프라인:"
echo "   - 사전 토큰화된 데이터 사용"
echo "   - 데이터 캐싱 활성화"
echo "   - 병렬 데이터 로딩"
echo "   - 압축된 데이터 포맷 사용"

결론

Microsoft ArchScale은 대규모 언어모델 사전 훈련을 위한 강력하고 유연한 프레임워크입니다. 이 가이드에서 다룬 내용들을 통해 다음과 같은 성과를 얻을 수 있습니다:

🎯 핵심 성과

  1. 비용 효율적인 연구: μP++ 스케일링으로 하이퍼파라미터 튜닝 비용 90% 절감
  2. 확장 가능한 아키텍처: 110M부터 수십억 파라미터까지 일관된 훈련 파이프라인
  3. 롱 컨텍스트 지원: 128K 토큰까지 효율적인 훈련
  4. 포괄적인 평가: RULER, Phonebook, 추론 벤치마크를 통한 다면적 성능 평가

🚀 실무 적용 포인트

  • 스타트업: 제한된 자원으로 효과적인 모델 연구
  • 연구기관: 다양한 아키텍처 실험과 스케일링 법칙 연구
  • 기업: 도메인 특화 모델의 효율적인 사전 훈련
  • 개발자: 최신 LLM 기술을 활용한 프로덕션 시스템 구축

📈 다음 단계

  1. 소규모 실험: 8 GPU로 시작하여 μP++ 스케일링 검증
  2. 아키텍처 탐색: SambaY, Mamba 등 다양한 아키텍처 비교
  3. 프로덕션 최적화: vLLM, quantization을 통한 서빙 최적화
  4. 커뮤니티 기여: ArchScale GitHub에 개선사항 기여

ArchScale을 통해 여러분만의 차세대 언어모델을 효율적으로 개발해보세요! 🚀


참고 자료: