개요

본 가이드는 Unsloth+TRL 한국어 LLM 학습 시리즈의 3편으로, Kubeflow, MLflow, DVC 등 엔터프라이즈급 MLOps 프레임워크를 활용하여 한국어 LLM 학습을 완전 자동화하는 방법을 다룹니다.

학습 목표:

  • 🚀 Kubeflow Pipelines: 시각적 ML 워크플로우 구축
  • 📊 MLflow: 실험 추적 및 모델 레지스트리 관리
  • 🔄 DVC: 데이터 버전 관리 및 재현 가능한 파이프라인
  • 통합 MLOps: 엔드투엔드 자동화 시스템

1. Kubeflow 설치 및 설정

1.1 Kubeflow 설치

# Kubeflow 1.8 설치
export PIPELINE_VERSION=1.8.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

# Kubeflow Dashboard 접근
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

1.2 Kubeflow Notebooks 설정

# kubeflow-notebook.yaml
apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
  name: korean-llm-notebook
  namespace: kubeflow-user-example-com
spec:
  template:
    spec:
      containers:
      - name: notebook
        image: jupyter/tensorflow-notebook:latest
        resources:
          requests:
            nvidia.com/gpu: 1
            memory: 16Gi
            cpu: 4
          limits:
            nvidia.com/gpu: 1
            memory: 16Gi
            cpu: 4
        volumeMounts:
        - name: workspace-pvc
          mountPath: /home/jovyan/work
      volumes:
      - name: workspace-pvc
        persistentVolumeClaim:
          claimName: workspace-pvc

2. MLflow 통합

2.1 MLflow 서버 설정

# mlflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-server
  namespace: mlops
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow-server
  template:
    metadata:
      labels:
        app: mlflow-server
    spec:
      containers:
      - name: mlflow
        image: python:3.9-slim
        command:
        - /bin/bash
        - -c
        - |
          pip install mlflow==2.8.1 psycopg2-binary boto3
          mlflow server \
            --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow \
            --default-artifact-root s3://mlflow-artifacts/ \
            --host 0.0.0.0 \
            --port 5000
        ports:
        - containerPort: 5000
        env:
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: s3-credentials
              key: access-key
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: s3-credentials
              key: secret-key
        - name: MLFLOW_S3_ENDPOINT_URL
          value: "https://s3.amazonaws.com"

2.2 실험 추적 코드

# mlflow_tracker.py
import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient
import torch
from datetime import datetime

class KoreanLLMTracker:
    def __init__(self, experiment_name="korean-llm-training"):
        self.client = MlflowClient()
        mlflow.set_tracking_uri("http://mlflow-server:5000")
        
        # 실험 생성 또는 가져오기
        try:
            experiment = self.client.get_experiment_by_name(experiment_name)
            self.experiment_id = experiment.experiment_id
        except:
            self.experiment_id = self.client.create_experiment(experiment_name)
    
    def start_run(self, run_name, stage="CPT"):
        """실험 실행 시작"""
        self.run = mlflow.start_run(
            experiment_id=self.experiment_id,
            run_name=f"{run_name}-{stage}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        )
        return self.run
    
    def log_parameters(self, params):
        """하이퍼파라미터 로깅"""
        mlflow.log_params(params)
    
    def log_metrics(self, metrics, step=None):
        """메트릭 로깅"""
        mlflow.log_metrics(metrics, step=step)
    
    def log_model(self, model, model_name, signature=None):
        """모델 로깅"""
        mlflow.pytorch.log_model(
            pytorch_model=model,
            artifact_path=model_name,
            signature=signature
        )
    
    def register_model(self, model_name, stage="Staging"):
        """모델 레지스트리에 등록"""
        model_uri = f"runs:/{self.run.info.run_id}/{model_name}"
        mlflow.register_model(model_uri, model_name)
        
        # 모델 스테이지 설정
        client = MlflowClient()
        client.transition_model_version_stage(
            name=model_name,
            version=1,
            stage=stage
        )
    
    def end_run(self):
        """실험 종료"""
        mlflow.end_run()

3. DVC 데이터 버전 관리

3.1 DVC 설정

# DVC 초기화
dvc init --no-scm
dvc remote add -d myremote s3://korean-llm-data

# 데이터 추적
dvc add data/korean_corpus.jsonl
dvc add data/instruction_dataset.jsonl

# 파이프라인 정의
dvc stage add -n preprocess \
  -d data/raw_korean_text.txt \
  -o data/korean_corpus.jsonl \
  python scripts/preprocess.py

dvc stage add -n cpt_training \
  -d data/korean_corpus.jsonl \
  -d scripts/cpt_train.py \
  -o models/cpt_model \
  -M metrics/cpt_metrics.json \
  python scripts/cpt_train.py

dvc stage add -n sft_training \
  -d models/cpt_model \
  -d data/instruction_dataset.jsonl \
  -o models/sft_model \
  -M metrics/sft_metrics.json \
  python scripts/sft_train.py

3.2 DVC 파이프라인 정의

# dvc.yaml
stages:
  preprocess:
    cmd: python scripts/preprocess.py
    deps:
    - data/raw_korean_text.txt
    - scripts/preprocess.py
    outs:
    - data/korean_corpus.jsonl
    
  cpt_training:
    cmd: python scripts/cpt_train.py
    deps:
    - data/korean_corpus.jsonl
    - scripts/cpt_train.py
    outs:
    - models/cpt_model
    metrics:
    - metrics/cpt_metrics.json
    
  sft_training:
    cmd: python scripts/sft_train.py
    deps:
    - models/cpt_model
    - data/instruction_dataset.jsonl
    - scripts/sft_train.py
    outs:
    - models/sft_model
    metrics:
    - metrics/sft_metrics.json
    
  evaluation:
    cmd: python scripts/evaluate.py
    deps:
    - models/sft_model
    - scripts/evaluate.py
    metrics:
    - metrics/evaluation_metrics.json

4. Kubeflow Pipelines 구성

4.1 파이프라인 컴포넌트 정의

# kubeflow_components.py
from kfp import dsl, components
from kfp.components import InputPath, OutputPath
import kfp

def preprocess_data_op(
    raw_data_path: InputPath(str),
    processed_data_path: OutputPath(str)
):
    """데이터 전처리 컴포넌트"""
    import json
    import re
    from pathlib import Path
    
    def clean_korean_text(text):
        # 한국어 텍스트 정규화
        text = re.sub(r'\s+', ' ', text)
        text = text.replace('ㆍ', '·')
        return text.strip()
    
    # 원본 데이터 로드
    with open(raw_data_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()
    
    # 전처리
    processed_text = clean_korean_text(raw_text)
    
    # 결과 저장
    processed_data = {
        "text": processed_text,
        "length": len(processed_text),
        "language": "korean"
    }
    
    with open(processed_data_path, 'w', encoding='utf-8') as f:
        json.dump(processed_data, f, ensure_ascii=False, indent=2)

def cpt_training_op(
    data_path: InputPath(str),
    model_output_path: OutputPath(str),
    metrics_path: OutputPath(str),
    model_size: str = "7B",
    learning_rate: float = 1e-5,
    num_epochs: int = 2
):
    """CPT 학습 컴포넌트"""
    import torch
    from unsloth import FastLanguageModel
    from datasets import load_dataset
    from transformers import TrainingArguments
    from trl import SFTTrainer
    import json
    import mlflow
    
    # MLflow 설정
    mlflow.set_tracking_uri("http://mlflow-server:5000")
    
    with mlflow.start_run():
        # 파라미터 로깅
        mlflow.log_params({
            "model_size": model_size,
            "learning_rate": learning_rate,
            "num_epochs": num_epochs,
            "stage": "CPT"
        })
        
        # 모델 로딩
        model_name = f"unsloth/Qwen2.5-{model_size}-bnb-4bit"
        model, tokenizer = FastLanguageModel.from_pretrained(
            model_name=model_name,
            max_seq_length=4096,
            dtype=None,
            load_in_4bit=True,
        )
        
        # LoRA 설정
        model = FastLanguageModel.get_peft_model(
            model,
            r=64,
            target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
            lora_alpha=16,
            lora_dropout=0.1,
            bias="none",
            use_gradient_checkpointing="unsloth",
        )
        
        # 데이터셋 로딩
        dataset = load_dataset("json", data_files=data_path, split="train")
        
        # 학습 설정
        training_args = TrainingArguments(
            output_dir=model_output_path,
            num_train_epochs=num_epochs,
            per_device_train_batch_size=2,
            learning_rate=learning_rate,
            logging_steps=100,
            save_steps=1000,
            fp16=True,
        )
        
        # 트레이너 초기화 및 학습
        trainer = SFTTrainer(
            model=model,
            tokenizer=tokenizer,
            args=training_args,
            train_dataset=dataset,
            dataset_text_field="text",
        )
        
        # 학습 실행
        trainer.train()
        
        # 모델 저장
        trainer.save_model(model_output_path)
        
        # 메트릭 저장
        metrics = {
            "final_loss": trainer.state.log_history[-1]["train_loss"],
            "total_steps": trainer.state.global_step,
            "model_size": model_size
        }
        
        with open(metrics_path, 'w') as f:
            json.dump(metrics, f, indent=2)
        
        # MLflow에 메트릭 로깅
        mlflow.log_metrics(metrics)
        mlflow.pytorch.log_model(model, "cpt_model")

def sft_training_op(
    base_model_path: InputPath(str),
    instruction_data_path: InputPath(str),
    model_output_path: OutputPath(str),
    metrics_path: OutputPath(str),
    learning_rate: float = 2e-5,
    num_epochs: int = 3
):
    """SFT 학습 컴포넌트"""
    import torch
    from unsloth import FastLanguageModel
    from datasets import load_dataset
    from transformers import TrainingArguments
    from trl import SFTTrainer
    import json
    import mlflow
    
    mlflow.set_tracking_uri("http://mlflow-server:5000")
    
    with mlflow.start_run():
        # 파라미터 로깅
        mlflow.log_params({
            "learning_rate": learning_rate,
            "num_epochs": num_epochs,
            "stage": "SFT"
        })
        
        # 베이스 모델 로딩
        model, tokenizer = FastLanguageModel.from_pretrained(
            model_name=base_model_path,
            max_seq_length=2048,
            dtype=None,
            load_in_4bit=True,
        )
        
        # Instruction 데이터셋 로딩
        dataset = load_dataset("json", data_files=instruction_data_path, split="train")
        
        # 학습 설정
        training_args = TrainingArguments(
            output_dir=model_output_path,
            num_train_epochs=num_epochs,
            per_device_train_batch_size=4,
            learning_rate=learning_rate,
            logging_steps=50,
            save_steps=500,
            fp16=True,
        )
        
        # 트레이너 초기화 및 학습
        trainer = SFTTrainer(
            model=model,
            tokenizer=tokenizer,
            args=training_args,
            train_dataset=dataset,
            dataset_text_field="text",
        )
        
        trainer.train()
        trainer.save_model(model_output_path)
        
        # 메트릭 저장
        metrics = {
            "final_loss": trainer.state.log_history[-1]["train_loss"],
            "total_steps": trainer.state.global_step,
            "stage": "SFT"
        }
        
        with open(metrics_path, 'w') as f:
            json.dump(metrics, f, indent=2)
        
        mlflow.log_metrics(metrics)
        mlflow.pytorch.log_model(model, "sft_model")

def evaluation_op(
    model_path: InputPath(str),
    evaluation_results_path: OutputPath(str),
    test_data_path: str = "/data/test_dataset.jsonl"
):
    """모델 평가 컴포넌트"""
    import torch
    from unsloth import FastLanguageModel
    from datasets import load_dataset
    import json
    import mlflow
    
    mlflow.set_tracking_uri("http://mlflow-server:5000")
    
    with mlflow.start_run():
        # 모델 로딩
        model, tokenizer = FastLanguageModel.from_pretrained(
            model_name=model_path,
            max_seq_length=2048,
            dtype=None,
            load_in_4bit=True,
        )
        
        # 추론 모드로 설정
        FastLanguageModel.for_inference(model)
        
        # 테스트 데이터 로딩
        test_dataset = load_dataset("json", data_files=test_data_path, split="train")
        
        # 평가 실행
        total_samples = len(test_dataset)
        correct_predictions = 0
        
        for sample in test_dataset[:100]:  # 샘플 100개로 제한
            prompt = sample["prompt"]
            expected = sample["expected"]
            
            inputs = tokenizer(prompt, return_tensors="pt")
            outputs = model.generate(**inputs, max_new_tokens=256)
            prediction = tokenizer.decode(outputs[0], skip_special_tokens=True)
            
            # 간단한 정확도 계산 (실제로는 더 정교한 메트릭 필요)
            if expected.lower() in prediction.lower():
                correct_predictions += 1
        
        accuracy = correct_predictions / min(100, total_samples)
        
        # 결과 저장
        results = {
            "accuracy": accuracy,
            "total_samples": min(100, total_samples),
            "correct_predictions": correct_predictions
        }
        
        with open(evaluation_results_path, 'w') as f:
            json.dump(results, f, indent=2)
        
        mlflow.log_metrics(results)

# 컴포넌트 생성
preprocess_data_component = components.create_component_from_func(
    preprocess_data_op,
    base_image="python:3.9-slim",
    packages_to_install=["datasets", "transformers"]
)

cpt_training_component = components.create_component_from_func(
    cpt_training_op,
    base_image="nvidia/cuda:11.8-devel-ubuntu20.04",
    packages_to_install=[
        "torch", "transformers", "datasets", 
        "unsloth @ git+https://github.com/unslothai/unsloth.git",
        "mlflow"
    ]
)

sft_training_component = components.create_component_from_func(
    sft_training_op,
    base_image="nvidia/cuda:11.8-devel-ubuntu20.04",
    packages_to_install=[
        "torch", "transformers", "datasets",
        "unsloth @ git+https://github.com/unslothai/unsloth.git",
        "mlflow"
    ]
)

evaluation_component = components.create_component_from_func(
    evaluation_op,
    base_image="nvidia/cuda:11.8-devel-ubuntu20.04",
    packages_to_install=[
        "torch", "transformers", "datasets",
        "unsloth @ git+https://github.com/unslothai/unsloth.git",
        "mlflow"
    ]
)

4.2 파이프라인 정의

# korean_llm_pipeline.py
from kfp import dsl
from kubeflow_components import *

@dsl.pipeline(
    name="Korean LLM Training Pipeline",
    description="Complete pipeline for training Korean-specialized LLM using Unsloth"
)
def korean_llm_training_pipeline(
    raw_data_path: str = "/data/raw_korean_text.txt",
    instruction_data_path: str = "/data/instruction_dataset.jsonl",
    model_size: str = "7B",
    cpt_learning_rate: float = 1e-5,
    sft_learning_rate: float = 2e-5,
    cpt_epochs: int = 2,
    sft_epochs: int = 3
):
    """한국어 LLM 학습 파이프라인"""
    
    # 1. 데이터 전처리
    preprocess_task = preprocess_data_component(
        raw_data_path=raw_data_path
    )
    
    # 2. CPT 학습
    cpt_task = cpt_training_component(
        data_path=preprocess_task.outputs["processed_data_path"],
        model_size=model_size,
        learning_rate=cpt_learning_rate,
        num_epochs=cpt_epochs
    )
    
    # GPU 리소스 요청
    cpt_task.set_gpu_limit(4)
    cpt_task.set_memory_limit("200Gi")
    cpt_task.set_cpu_limit("32")
    
    # 3. SFT 학습
    sft_task = sft_training_component(
        base_model_path=cpt_task.outputs["model_output_path"],
        instruction_data_path=instruction_data_path,
        learning_rate=sft_learning_rate,
        num_epochs=sft_epochs
    )
    
    # GPU 리소스 요청
    sft_task.set_gpu_limit(2)
    sft_task.set_memory_limit("100Gi")
    sft_task.set_cpu_limit("16")
    
    # 4. 모델 평가
    evaluation_task = evaluation_component(
        model_path=sft_task.outputs["model_output_path"]
    )
    
    evaluation_task.set_gpu_limit(1)
    evaluation_task.set_memory_limit("50Gi")
    evaluation_task.set_cpu_limit("8")
    
    # 파이프라인 설정
    preprocess_task.set_retry(3)
    cpt_task.set_retry(2)
    sft_task.set_retry(2)
    evaluation_task.set_retry(3)

# 파이프라인 컴파일
if __name__ == "__main__":
    from kfp.compiler import Compiler
    
    Compiler().compile(
        pipeline_func=korean_llm_training_pipeline,
        package_path="korean_llm_pipeline.yaml"
    )

5. 통합 모니터링 대시보드

5.1 Grafana 대시보드 설정

{
  "dashboard": {
    "title": "Korean LLM Training - MLOps Dashboard",
    "panels": [
      {
        "title": "Pipeline Execution Status",
        "type": "stat",
        "targets": [
          {
            "expr": "kubeflow_pipeline_runs_total",
            "legendFormat": "Total Runs"
          }
        ]
      },
      {
        "title": "Training Loss Over Time",
        "type": "graph",
        "targets": [
          {
            "expr": "mlflow_metric_train_loss",
            "legendFormat": " Loss"
          }
        ]
      },
      {
        "title": "GPU Utilization",
        "type": "graph",
        "targets": [
          {
            "expr": "nvidia_gpu_utilization",
            "legendFormat": "GPU "
          }
        ]
      },
      {
        "title": "Model Performance Metrics",
        "type": "table",
        "targets": [
          {
            "expr": "mlflow_metric_accuracy",
            "legendFormat": "Accuracy"
          }
        ]
      }
    ]
  }
}

5.2 알림 설정

# alerting-rules.yaml
groups:
- name: korean-llm-training
  rules:
  - alert: TrainingJobFailed
    expr: kubeflow_pipeline_run_status{status="Failed"} > 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Korean LLM training job failed"
      description: "Pipeline {{ $labels.pipeline_name }} failed"
  
  - alert: LowGPUUtilization
    expr: nvidia_gpu_utilization < 50
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "Low GPU utilization detected"
      description: "GPU utilization is below 50% for 10 minutes"
  
  - alert: HighTrainingLoss
    expr: increase(mlflow_metric_train_loss[1h]) > 0.1
    for: 30m
    labels:
      severity: warning
    annotations:
      summary: "Training loss is increasing"
      description: "Training loss increased by more than 0.1 in the last hour"

6. 자동화된 모델 배포

6.1 모델 레지스트리 연동

# model_deployment.py
from mlflow.tracking import MlflowClient
import kubernetes
from kubernetes import client, config
import yaml

class ModelDeploymentManager:
    def __init__(self):
        self.mlflow_client = MlflowClient("http://mlflow-server:5000")
        config.load_incluster_config()
        self.k8s_apps_v1 = client.AppsV1Api()
        self.k8s_core_v1 = client.CoreV1Api()
    
    def get_latest_model(self, model_name, stage="Production"):
        """최신 프로덕션 모델 가져오기"""
        latest_version = self.mlflow_client.get_latest_versions(
            model_name, 
            stages=[stage]
        )[0]
        return latest_version
    
    def create_inference_deployment(self, model_version, namespace="inference"):
        """추론 서비스 배포"""
        deployment_yaml = f"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: korean-llm-inference
  namespace: {namespace}
spec:
  replicas: 2
  selector:
    matchLabels:
      app: korean-llm-inference
  template:
    metadata:
      labels:
        app: korean-llm-inference
    spec:
      containers:
      - name: inference-server
        image: your-registry/korean-llm-inference:latest
        env:
        - name: MODEL_URI
          value: "{model_version.source}"
        - name: MODEL_VERSION
          value: "{model_version.version}"
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: 1
            memory: 16Gi
            cpu: 4
          limits:
            nvidia.com/gpu: 1
            memory: 16Gi
            cpu: 4
---
apiVersion: v1
kind: Service
metadata:
  name: korean-llm-service
  namespace: {namespace}
spec:
  selector:
    app: korean-llm-inference
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer
"""
        
        # YAML 파싱 및 배포
        deployment_dict = yaml.safe_load(deployment_yaml)
        
        # Deployment 생성
        self.k8s_apps_v1.create_namespaced_deployment(
            namespace=namespace,
            body=deployment_dict
        )
    
    def auto_deploy_on_model_update(self, model_name):
        """모델 업데이트 시 자동 배포"""
        latest_model = self.get_latest_model(model_name)
        
        # 현재 배포된 모델 버전 확인
        current_deployment = self.k8s_apps_v1.read_namespaced_deployment(
            name="korean-llm-inference",
            namespace="inference"
        )
        
        current_version = None
        for env in current_deployment.spec.template.spec.containers[0].env:
            if env.name == "MODEL_VERSION":
                current_version = env.value
                break
        
        # 새 버전이 있으면 배포 업데이트
        if current_version != latest_model.version:
            print(f"Updating deployment from version {current_version} to {latest_model.version}")
            self.update_deployment(latest_model)
    
    def update_deployment(self, model_version):
        """배포 업데이트"""
        # 배포 업데이트 로직
        patch = {
            "spec": {
                "template": {
                    "spec": {
                        "containers": [{
                            "name": "inference-server",
                            "env": [
                                {"name": "MODEL_URI", "value": model_version.source},
                                {"name": "MODEL_VERSION", "value": model_version.version}
                            ]
                        }]
                    }
                }
            }
        }
        
        self.k8s_apps_v1.patch_namespaced_deployment(
            name="korean-llm-inference",
            namespace="inference",
            body=patch
        )

6.2 A/B 테스트 설정

# ab-testing-setup.yaml
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
  name: korean-llm-rollout
  namespace: inference
spec:
  replicas: 4
  strategy:
    canary:
      steps:
      - setWeight: 25
      - pause: {duration: 10m}
      - setWeight: 50
      - pause: {duration: 10m}
      - setWeight: 75
      - pause: {duration: 10m}
      analysis:
        templates:
        - templateName: success-rate
        args:
        - name: service-name
          value: korean-llm-service
  selector:
    matchLabels:
      app: korean-llm-inference
  template:
    metadata:
      labels:
        app: korean-llm-inference
    spec:
      containers:
      - name: inference-server
        image: your-registry/korean-llm-inference:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: 1
            memory: 16Gi
---
apiVersion: argoproj.io/v1alpha1
kind: AnalysisTemplate
metadata:
  name: success-rate
spec:
  args:
  - name: service-name
  metrics:
  - name: success-rate
    interval: 2m
    count: 5
    successCondition: result[0] >= 0.95
    provider:
      prometheus:
        address: http://prometheus:9090
        query: |
          sum(rate(http_requests_total{service="",code=~"2.."}[2m])) /
          sum(rate(http_requests_total{service=""}[2m]))

7. 실행 및 관리

7.1 파이프라인 실행

# run_pipeline.py
import kfp
from kfp import dsl

# Kubeflow Pipelines 클라이언트 생성
client = kfp.Client(host="http://kubeflow-pipelines:8080")

# 파이프라인 업로드
pipeline_id = client.upload_pipeline(
    pipeline_package_path="korean_llm_pipeline.yaml",
    pipeline_name="Korean LLM Training Pipeline v1.0"
)

# 실험 생성
experiment = client.create_experiment(
    name="Korean LLM Training Experiment",
    description="Automated training of Korean-specialized LLM"
)

# 파이프라인 실행
run = client.run_pipeline(
    experiment_id=experiment.id,
    job_name="korean-llm-training-run-1",
    pipeline_id=pipeline_id,
    params={
        "model_size": "7B",
        "cpt_learning_rate": 1e-5,
        "sft_learning_rate": 2e-5,
        "cpt_epochs": 2,
        "sft_epochs": 3
    }
)

print(f"Pipeline run started: {run.id}")

7.2 자동화 스케줄링

# scheduler.py
from kfp import dsl
import schedule
import time

class PipelineScheduler:
    def __init__(self, client):
        self.client = client
        self.pipeline_id = None
        self.experiment_id = None
    
    def setup_pipeline(self):
        """파이프라인 및 실험 설정"""
        # 파이프라인 업로드
        self.pipeline_id = self.client.upload_pipeline(
            pipeline_package_path="korean_llm_pipeline.yaml",
            pipeline_name="Korean LLM Scheduled Training"
        )
        
        # 실험 생성
        experiment = self.client.create_experiment(
            name="Scheduled Korean LLM Training"
        )
        self.experiment_id = experiment.id
    
    def run_training_pipeline(self):
        """학습 파이프라인 실행"""
        run = self.client.run_pipeline(
            experiment_id=self.experiment_id,
            job_name=f"scheduled-run-{int(time.time())}",
            pipeline_id=self.pipeline_id,
            params={
                "model_size": "7B",
                "cpt_learning_rate": 1e-5,
                "sft_learning_rate": 2e-5
            }
        )
        print(f"Scheduled pipeline run started: {run.id}")
    
    def start_scheduler(self):
        """스케줄러 시작"""
        # 매주 일요일 오전 2시에 실행
        schedule.every().sunday.at("02:00").do(self.run_training_pipeline)
        
        # 새로운 데이터가 추가될 때마다 실행 (예: 매일 체크)
        schedule.every().day.at("06:00").do(self.check_and_run_if_new_data)
        
        while True:
            schedule.run_pending()
            time.sleep(3600)  # 1시간마다 체크
    
    def check_and_run_if_new_data(self):
        """새로운 데이터가 있을 때만 실행"""
        # DVC를 사용하여 데이터 변경 확인
        import subprocess
        result = subprocess.run(["dvc", "status"], capture_output=True, text=True)
        
        if "changed" in result.stdout:
            print("New data detected, starting training pipeline...")
            self.run_training_pipeline()
        else:
            print("No new data, skipping training...")

# 사용 예시
if __name__ == "__main__":
    client = kfp.Client(host="http://kubeflow-pipelines:8080")
    scheduler = PipelineScheduler(client)
    scheduler.setup_pipeline()
    scheduler.start_scheduler()

결론

본 가이드를 통해 Kubeflow, MLflow, DVC를 활용한 엔터프라이즈급 한국어 LLM 학습 자동화 시스템을 구축했습니다.

주요 성과:

  • 🚀 완전 자동화: 데이터 → 학습 → 평가 → 배포 전 과정
  • 📊 실험 관리: MLflow를 통한 체계적 실험 추적
  • 🔄 데이터 버전 관리: DVC로 재현 가능한 파이프라인
  • 확장성: Kubeflow의 분산 처리 및 스케일링

실무적 가치:

  • 운영 효율성: GUI 기반 파이프라인 관리
  • 품질 보증: 자동화된 평가 및 배포 검증
  • 비용 최적화: 리소스 사용량 최적화
  • 거버넌스: 모델 레지스트리를 통한 모델 생명주기 관리

이 시리즈의 다른 글 보기:

이러한 엔터프라이즈급 MLOps 시스템을 통해 한국어 특화 LLM 개발의 생산성과 품질을 극대화할 수 있습니다.