OpenVoice 완전 가이드: MIT 기술로 구현하는 즉석 음성 복제 및 다국어 TTS 시스템
⏱️ 예상 읽기 시간: 16분
서론
음성 기술의 혁신이 가속화되면서, 개인의 목소리를 실시간으로 복제하고 다양한 언어로 변환하는 기술이 현실이 되었습니다. OpenVoice는 MIT와 MyShell이 공동 개발한 혁신적인 오픈소스 음성 복제 솔루션으로, 단 몇 초의 참조 음성만으로도 정확한 톤 컬러 복제와 다국어 음성 생성이 가능합니다.
이 가이드에서는 OpenVoice V2의 핵심 기능부터 실제 프로덕션 환경에서의 활용법까지, 33.4k GitHub 스타를 받은 이 혁신적인 기술을 완전히 마스터하는 방법을 제공합니다.
OpenVoice 기술 개요
핵심 혁신 기술
OpenVoice V1의 3대 혁신:
- 정확한 톤 컬러 복제: 참조 음성의 고유한 음색을 정밀하게 재현
- 유연한 음성 스타일 제어: 감정, 억양, 리듬, 일시정지 등 세밀한 조절
- 제로샷 크로스링구얼: 훈련 데이터에 없는 언어도 즉시 지원
OpenVoice V2의 추가 개선:
- 향상된 음질: 새로운 훈련 전략으로 더욱 자연스러운 음성
- 네이티브 다국어 지원: 영어, 스페인어, 프랑스어, 중국어, 일본어, 한국어
- MIT 라이선스: 완전한 상업적 이용 가능
기술 아키텍처
openvoice_architecture:
core_components:
base_speaker_encoder: "화자 임베딩 추출"
tone_color_converter: "음색 변환 엔진"
multi_language_synthesizer: "다국어 음성 합성"
style_controller: "음성 스타일 제어"
supported_features:
voice_cloning: "단일 참조로 즉석 복제"
cross_lingual: "언어 간 음성 변환"
emotion_control: "감정 표현 조절"
speed_control: "발화 속도 제어"
pitch_control: "음높이 조절"
output_quality:
sample_rate: "24kHz"
bit_depth: "16-bit"
format: "WAV/MP3"
환경 설정 및 설치
1. 기본 환경 준비
# Python 환경 확인 (3.8 이상 필요)
python --version
# 가상환경 생성
python -m venv openvoice_env
source openvoice_env/bin/activate # Windows: openvoice_env\Scripts\activate
# 기본 의존성 설치
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu121
pip install numpy scipy librosa soundfile
pip install gradio jupyter ipython
2. OpenVoice 설치
# GitHub에서 클론
git clone https://github.com/myshell-ai/OpenVoice.git
cd OpenVoice
# 의존성 설치
pip install -r requirements.txt
# 개발 환경 설정
pip install -e .
# 사전 훈련된 모델 다운로드
mkdir checkpoints
wget https://myshell-public-repo-hosting.s3.amazonaws.com/openvoice/checkpoints_v2.zip
unzip checkpoints_v2.zip -d checkpoints/
3. GPU 설정 확인
# gpu_check.py
import torch
import torchaudio
def check_environment():
print("=== OpenVoice 환경 점검 ===")
# PyTorch 버전 확인
print(f"PyTorch 버전: {torch.__version__}")
print(f"TorchAudio 버전: {torchaudio.__version__}")
# CUDA 지원 확인
if torch.cuda.is_available():
print(f"CUDA 사용 가능: {torch.cuda.get_device_name(0)}")
print(f"CUDA 메모리: {torch.cuda.get_device_properties(0).total_memory // 1024**3}GB")
else:
print("CUDA 사용 불가 - CPU 모드로 실행")
# 오디오 처리 확인
try:
import librosa
import soundfile as sf
print("오디오 라이브러리 정상 작동")
except ImportError as e:
print(f"오디오 라이브러리 오류: {e}")
print("환경 점검 완료!")
if __name__ == "__main__":
check_environment()
기본 사용법
1. 간단한 음성 복제
# basic_voice_cloning.py
import os
import torch
import librosa
import soundfile as sf
from openvoice import se_extractor
from openvoice.api import BaseSpeakerTTS, ToneColorConverter
def basic_voice_clone():
"""기본 음성 복제 예제"""
# 디바이스 설정
device = "cuda" if torch.cuda.is_available() else "cpu"
# 모델 로드
ckpt_base = 'checkpoints/base_speakers/EN'
ckpt_converter = 'checkpoints/converter'
# TTS 모델 초기화
base_speaker_tts = BaseSpeakerTTS(f'{ckpt_base}/config.json', device=device)
base_speaker_tts.load_ckpt(f'{ckpt_base}/checkpoint.pth')
# 톤 컬러 컨버터 초기화
tone_color_converter = ToneColorConverter(f'{ckpt_converter}/config.json', device=device)
tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
# 참조 음성에서 화자 임베딩 추출
reference_speaker = 'resources/example_reference.wav'
target_se, audio_name = se_extractor.get_se(
reference_speaker,
tone_color_converter,
vad=True # 음성 활동 감지 활성화
)
# 기본 TTS로 중간 음성 생성
text = "안녕하세요! OpenVoice로 생성된 음성입니다."
src_path = 'temp_output.wav'
base_speaker_tts.tts(
text,
src_path,
speaker='default',
language='KR', # 한국어 설정
speed=1.0
)
# 톤 컬러 변환
output_path = 'cloned_voice_output.wav'
encode_message = "@MyShell"
tone_color_converter.convert(
audio_src_path=src_path,
src_se=base_speaker_tts.hps.data.spk2id['default'],
tgt_se=target_se,
output_path=output_path,
message=encode_message
)
print(f"음성 복제 완료! 결과: {output_path}")
# 임시 파일 정리
os.remove(src_path)
return output_path
if __name__ == "__main__":
basic_voice_clone()
2. 다국어 음성 생성
# multilingual_tts.py
import torch
from openvoice import se_extractor
from openvoice.api import BaseSpeakerTTS, ToneColorConverter
class MultilingualVoiceCloner:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.setup_models()
def setup_models(self):
"""다국어 모델 설정"""
# 지원 언어별 모델 경로
self.language_models = {
'EN': 'checkpoints/base_speakers/EN',
'ES': 'checkpoints/base_speakers/ES',
'FR': 'checkpoints/base_speakers/FR',
'ZH': 'checkpoints/base_speakers/ZH',
'JP': 'checkpoints/base_speakers/JP',
'KR': 'checkpoints/base_speakers/KR'
}
# 컨버터 모델
ckpt_converter = 'checkpoints/converter'
self.tone_color_converter = ToneColorConverter(
f'{ckpt_converter}/config.json',
device=self.device
)
self.tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
# 언어별 TTS 모델 로드
self.tts_models = {}
for lang, path in self.language_models.items():
if os.path.exists(path):
tts = BaseSpeakerTTS(f'{path}/config.json', device=self.device)
tts.load_ckpt(f'{path}/checkpoint.pth')
self.tts_models[lang] = tts
print(f"{lang} 모델 로드 완료")
def clone_voice_multilingual(self, reference_audio, texts_by_language, output_dir="outputs"):
"""다국어 음성 복제"""
os.makedirs(output_dir, exist_ok=True)
# 참조 화자 임베딩 추출
target_se, _ = se_extractor.get_se(
reference_audio,
self.tone_color_converter,
vad=True
)
results = {}
for language, text in texts_by_language.items():
if language not in self.tts_models:
print(f"언어 {language} 지원되지 않음")
continue
try:
# 중간 음성 생성
temp_path = f'temp_{language}.wav'
self.tts_models[language].tts(
text,
temp_path,
speaker='default',
language=language,
speed=1.0
)
# 톤 컬러 변환
output_path = f'{output_dir}/cloned_{language}.wav'
self.tone_color_converter.convert(
audio_src_path=temp_path,
src_se=self.tts_models[language].hps.data.spk2id['default'],
tgt_se=target_se,
output_path=output_path
)
results[language] = output_path
print(f"{language} 음성 생성 완료: {output_path}")
# 임시 파일 정리
os.remove(temp_path)
except Exception as e:
print(f"{language} 음성 생성 실패: {e}")
return results
def multilingual_demo():
"""다국어 음성 복제 데모"""
cloner = MultilingualVoiceCloner()
# 참조 음성 (영어 화자)
reference_audio = "resources/reference_speaker.wav"
# 다국어 텍스트
multilingual_texts = {
'EN': "Hello, this is a demonstration of OpenVoice multilingual capabilities.",
'ES': "Hola, esta es una demostración de las capacidades multilingües de OpenVoice.",
'FR': "Bonjour, ceci est une démonstration des capacités multilingues d'OpenVoice.",
'ZH': "你好,这是OpenVoice多语言功能的演示。",
'JP': "こんにちは、これはOpenVoiceの多言語機能のデモンストレーションです。",
'KR': "안녕하세요, 이것은 OpenVoice 다국어 기능의 시연입니다."
}
# 다국어 음성 생성
results = cloner.clone_voice_multilingual(reference_audio, multilingual_texts)
print("\n=== 다국어 음성 생성 결과 ===")
for lang, path in results.items():
print(f"{lang}: {path}")
if __name__ == "__main__":
multilingual_demo()
3. 감정 및 스타일 제어
# emotion_style_control.py
import torch
import numpy as np
from openvoice import se_extractor
from openvoice.api import BaseSpeakerTTS, ToneColorConverter
class EmotionalVoiceController:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.setup_models()
# 감정별 파라미터 설정
self.emotion_configs = {
'neutral': {'speed': 1.0, 'pitch_shift': 0, 'energy': 1.0},
'happy': {'speed': 1.1, 'pitch_shift': 2, 'energy': 1.2},
'sad': {'speed': 0.9, 'pitch_shift': -2, 'energy': 0.8},
'angry': {'speed': 1.2, 'pitch_shift': 1, 'energy': 1.3},
'calm': {'speed': 0.95, 'pitch_shift': -1, 'energy': 0.9},
'excited': {'speed': 1.15, 'pitch_shift': 3, 'energy': 1.4}
}
def setup_models(self):
"""모델 초기화"""
ckpt_base = 'checkpoints/base_speakers/EN'
ckpt_converter = 'checkpoints/converter'
self.base_speaker_tts = BaseSpeakerTTS(
f'{ckpt_base}/config.json',
device=self.device
)
self.base_speaker_tts.load_ckpt(f'{ckpt_base}/checkpoint.pth')
self.tone_color_converter = ToneColorConverter(
f'{ckpt_converter}/config.json',
device=self.device
)
self.tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
def generate_emotional_voice(self, text, reference_audio, emotion='neutral', output_path=None):
"""감정이 담긴 음성 생성"""
if emotion not in self.emotion_configs:
raise ValueError(f"지원되지 않는 감정: {emotion}")
config = self.emotion_configs[emotion]
# 참조 화자 임베딩 추출
target_se, _ = se_extractor.get_se(
reference_audio,
self.tone_color_converter,
vad=True
)
# 감정 파라미터 적용하여 TTS 생성
temp_path = f'temp_emotional_{emotion}.wav'
self.base_speaker_tts.tts(
text,
temp_path,
speaker='default',
language='EN',
speed=config['speed']
)
# 음높이 조절 (후처리)
if config['pitch_shift'] != 0:
self.apply_pitch_shift(temp_path, config['pitch_shift'])
# 톤 컬러 변환
if output_path is None:
output_path = f'emotional_voice_{emotion}.wav'
self.tone_color_converter.convert(
audio_src_path=temp_path,
src_se=self.base_speaker_tts.hps.data.spk2id['default'],
tgt_se=target_se,
output_path=output_path
)
# 임시 파일 정리
os.remove(temp_path)
return output_path
def apply_pitch_shift(self, audio_path, semitones):
"""음높이 조절"""
import librosa
import soundfile as sf
# 오디오 로드
y, sr = librosa.load(audio_path, sr=None)
# 음높이 변경
y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=semitones)
# 파일 저장
sf.write(audio_path, y_shifted, sr)
def create_emotion_comparison(self, text, reference_audio, output_dir="emotion_comparison"):
"""감정별 음성 비교 생성"""
os.makedirs(output_dir, exist_ok=True)
results = {}
print("감정별 음성 생성 중...")
for emotion in self.emotion_configs.keys():
output_path = f"{output_dir}/{emotion}_voice.wav"
try:
result_path = self.generate_emotional_voice(
text, reference_audio, emotion, output_path
)
results[emotion] = result_path
print(f"✅ {emotion}: {result_path}")
except Exception as e:
print(f"❌ {emotion} 생성 실패: {e}")
return results
def emotion_demo():
"""감정 제어 데모"""
controller = EmotionalVoiceController()
# 테스트 텍스트와 참조 음성
text = "Welcome to the world of emotional voice synthesis with OpenVoice!"
reference_audio = "resources/reference_speaker.wav"
# 감정별 음성 생성
results = controller.create_emotion_comparison(text, reference_audio)
print("\n=== 감정별 음성 생성 완료 ===")
for emotion, path in results.items():
config = controller.emotion_configs[emotion]
print(f"{emotion.upper()}: {path}")
print(f" - 속도: {config['speed']}x")
print(f" - 음높이: {config['pitch_shift']} 반음")
print(f" - 에너지: {config['energy']}x")
if __name__ == "__main__":
emotion_demo()
고급 활용 사례
1. 실시간 음성 변환 시스템
# real_time_voice_conversion.py
import pyaudio
import wave
import threading
import queue
import numpy as np
from openvoice.api import ToneColorConverter
from openvoice import se_extractor
class RealTimeVoiceConverter:
def __init__(self, reference_audio_path):
# 오디오 설정
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 24000
# 큐 설정
self.audio_queue = queue.Queue()
self.output_queue = queue.Queue()
# 모델 설정
self.setup_converter()
self.load_reference_speaker(reference_audio_path)
# 오디오 인터페이스
self.audio = pyaudio.PyAudio()
def setup_converter(self):
"""음성 변환 모델 설정"""
device = "cuda" if torch.cuda.is_available() else "cpu"
ckpt_converter = 'checkpoints/converter'
self.tone_color_converter = ToneColorConverter(
f'{ckpt_converter}/config.json',
device=device
)
self.tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
def load_reference_speaker(self, reference_path):
"""참조 화자 임베딩 로드"""
self.target_se, _ = se_extractor.get_se(
reference_path,
self.tone_color_converter,
vad=True
)
print(f"참조 화자 로드 완료: {reference_path}")
def audio_input_thread(self):
"""오디오 입력 스레드"""
stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
print("🎤 음성 입력 시작...")
while self.recording:
try:
data = stream.read(self.CHUNK, exception_on_overflow=False)
self.audio_queue.put(data)
except Exception as e:
print(f"입력 오류: {e}")
break
stream.stop_stream()
stream.close()
def audio_processing_thread(self):
"""오디오 처리 스레드"""
audio_buffer = []
buffer_size = self.RATE * 2 # 2초 버퍼
while self.recording:
try:
# 오디오 데이터 수집
data = self.audio_queue.get(timeout=1)
audio_array = np.frombuffer(data, dtype=np.int16)
audio_buffer.extend(audio_array)
# 버퍼가 충분히 찼을 때 처리
if len(audio_buffer) >= buffer_size:
# 음성 변환 처리
converted_audio = self.process_audio_chunk(
np.array(audio_buffer[:buffer_size])
)
self.output_queue.put(converted_audio)
# 버퍼 슬라이딩 (50% 오버랩)
audio_buffer = audio_buffer[buffer_size//2:]
except queue.Empty:
continue
except Exception as e:
print(f"처리 오류: {e}")
def process_audio_chunk(self, audio_chunk):
"""오디오 청크 처리"""
try:
# 임시 파일 저장
temp_input = "temp_input.wav"
temp_output = "temp_output.wav"
# WAV 파일로 저장
with wave.open(temp_input, 'wb') as wf:
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.audio.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(audio_chunk.tobytes())
# 음성 변환 (빠른 모드)
self.tone_color_converter.convert(
audio_src_path=temp_input,
src_se=0, # 기본 화자
tgt_se=self.target_se,
output_path=temp_output,
fast_mode=True # 실시간 처리를 위한 빠른 모드
)
# 변환된 오디오 로드
with wave.open(temp_output, 'rb') as wf:
converted_data = wf.readframes(wf.getnframes())
# 임시 파일 정리
os.remove(temp_input)
os.remove(temp_output)
return converted_data
except Exception as e:
print(f"변환 오류: {e}")
return audio_chunk.tobytes()
def audio_output_thread(self):
"""오디오 출력 스레드"""
stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
output=True,
frames_per_buffer=self.CHUNK
)
print("🔊 음성 출력 시작...")
while self.recording:
try:
converted_data = self.output_queue.get(timeout=1)
stream.write(converted_data)
except queue.Empty:
continue
except Exception as e:
print(f"출력 오류: {e}")
stream.stop_stream()
stream.close()
def start_conversion(self, duration=None):
"""실시간 변환 시작"""
self.recording = True
# 스레드 시작
input_thread = threading.Thread(target=self.audio_input_thread)
processing_thread = threading.Thread(target=self.audio_processing_thread)
output_thread = threading.Thread(target=self.audio_output_thread)
input_thread.start()
processing_thread.start()
output_thread.start()
print("🚀 실시간 음성 변환 시작!")
print("Ctrl+C로 종료...")
try:
if duration:
time.sleep(duration)
else:
input("Enter를 눌러 종료...")
except KeyboardInterrupt:
pass
# 변환 중지
self.recording = False
input_thread.join()
processing_thread.join()
output_thread.join()
self.audio.terminate()
print("실시간 변환 종료")
def real_time_demo():
"""실시간 음성 변환 데모"""
reference_audio = "resources/target_speaker.wav"
converter = RealTimeVoiceConverter(reference_audio)
converter.start_conversion(duration=30) # 30초 동안 실행
if __name__ == "__main__":
real_time_demo()
2. 배치 음성 처리 시스템
# batch_voice_processing.py
import os
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pandas as pd
class BatchVoiceProcessor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.setup_logging()
self.setup_models()
def setup_logging(self):
"""로깅 설정"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('batch_processing.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_models(self):
"""모델 초기화"""
device = "cuda" if torch.cuda.is_available() else "cpu"
# 언어별 모델 설정
self.language_models = {
'EN': 'checkpoints/base_speakers/EN',
'KR': 'checkpoints/base_speakers/KR',
'ZH': 'checkpoints/base_speakers/ZH'
}
# 컨버터 모델
ckpt_converter = 'checkpoints/converter'
self.tone_color_converter = ToneColorConverter(
f'{ckpt_converter}/config.json',
device=device
)
self.tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
# TTS 모델들
self.tts_models = {}
for lang, path in self.language_models.items():
if os.path.exists(path):
tts = BaseSpeakerTTS(f'{path}/config.json', device=device)
tts.load_ckpt(f'{path}/checkpoint.pth')
self.tts_models[lang] = tts
def process_single_item(self, item):
"""단일 아이템 처리"""
try:
# 작업 정보 추출
text = item['text']
language = item['language']
reference_audio = item['reference_audio']
output_path = item['output_path']
speaker_name = item.get('speaker_name', 'unknown')
self.logger.info(f"처리 시작: {speaker_name} - {language}")
# 참조 화자 임베딩 추출
target_se, _ = se_extractor.get_se(
reference_audio,
self.tone_color_converter,
vad=True
)
# TTS 생성
temp_path = f'temp_{os.path.basename(output_path)}'
self.tts_models[language].tts(
text,
temp_path,
speaker='default',
language=language,
speed=item.get('speed', 1.0)
)
# 톤 컬러 변환
self.tone_color_converter.convert(
audio_src_path=temp_path,
src_se=self.tts_models[language].hps.data.spk2id['default'],
tgt_se=target_se,
output_path=output_path
)
# 임시 파일 정리
os.remove(temp_path)
self.logger.info(f"처리 완료: {output_path}")
return {
'status': 'success',
'output_path': output_path,
'speaker_name': speaker_name,
'language': language
}
except Exception as e:
self.logger.error(f"처리 실패: {speaker_name} - {str(e)}")
return {
'status': 'error',
'error': str(e),
'speaker_name': speaker_name,
'language': language
}
def process_batch(self, batch_config_path, output_dir="batch_outputs"):
"""배치 처리 실행"""
# 설정 파일 로드
with open(batch_config_path, 'r', encoding='utf-8') as f:
batch_config = json.load(f)
# 출력 디렉토리 생성
os.makedirs(output_dir, exist_ok=True)
# 작업 목록 준비
items = []
for idx, item in enumerate(batch_config['items']):
item['output_path'] = os.path.join(
output_dir,
f"{item.get('speaker_name', f'speaker_{idx}')}_{item['language']}.wav"
)
items.append(item)
# 병렬 처리
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 작업 제출
future_to_item = {
executor.submit(self.process_single_item, item): item
for item in items
}
# 결과 수집
for future in as_completed(future_to_item):
result = future.result()
results.append(result)
# 진행 상황 출력
completed = len(results)
total = len(items)
progress = (completed / total) * 100
self.logger.info(f"진행률: {progress:.1f}% ({completed}/{total})")
# 결과 리포트 생성
self.generate_report(results, output_dir)
return results
def generate_report(self, results, output_dir):
"""처리 결과 리포트 생성"""
# 통계 계산
total = len(results)
successful = len([r for r in results if r['status'] == 'success'])
failed = total - successful
# 상세 리포트
report = {
'summary': {
'total_items': total,
'successful': successful,
'failed': failed,
'success_rate': f"{(successful/total)*100:.1f}%"
},
'details': results
}
# JSON 리포트 저장
report_path = os.path.join(output_dir, 'processing_report.json')
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# CSV 요약 저장
df = pd.DataFrame(results)
csv_path = os.path.join(output_dir, 'processing_summary.csv')
df.to_csv(csv_path, index=False, encoding='utf-8')
self.logger.info(f"리포트 생성 완료: {report_path}")
# 콘솔 요약 출력
print(f"\n=== 배치 처리 완료 ===")
print(f"총 작업: {total}개")
print(f"성공: {successful}개")
print(f"실패: {failed}개")
print(f"성공률: {(successful/total)*100:.1f}%")
def create_batch_config():
"""배치 설정 파일 생성 예제"""
batch_config = {
"description": "다국어 음성 데이터셋 생성",
"items": [
{
"speaker_name": "speaker_001",
"text": "Welcome to OpenVoice batch processing system.",
"language": "EN",
"reference_audio": "references/speaker_001.wav",
"speed": 1.0
},
{
"speaker_name": "speaker_001",
"text": "OpenVoice 배치 처리 시스템에 오신 것을 환영합니다.",
"language": "KR",
"reference_audio": "references/speaker_001.wav",
"speed": 1.0
},
{
"speaker_name": "speaker_002",
"text": "This is a demonstration of multilingual voice cloning.",
"language": "EN",
"reference_audio": "references/speaker_002.wav",
"speed": 1.1
}
]
}
with open('batch_config.json', 'w', encoding='utf-8') as f:
json.dump(batch_config, f, indent=2, ensure_ascii=False)
print("배치 설정 파일 생성됨: batch_config.json")
def batch_demo():
"""배치 처리 데모"""
# 설정 파일 생성
create_batch_config()
# 배치 프로세서 초기화
processor = BatchVoiceProcessor(max_workers=2)
# 배치 처리 실행
results = processor.process_batch('batch_config.json')
print("배치 처리 완료!")
if __name__ == "__main__":
batch_demo()
3. 웹 API 서비스
# web_api_service.py
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from pydantic import BaseModel
import uvicorn
import asyncio
import aiofiles
import uuid
import os
from typing import Optional
# FastAPI 앱 초기화
app = FastAPI(
title="OpenVoice API",
description="음성 복제 및 다국어 TTS API",
version="2.0.0"
)
# 요청 모델
class VoiceCloneRequest(BaseModel):
text: str
language: str = "EN"
speed: float = 1.0
emotion: str = "neutral"
class TTSRequest(BaseModel):
text: str
language: str = "EN"
speaker: str = "default"
speed: float = 1.0
# 전역 변수
voice_processor = None
active_jobs = {}
@app.on_event("startup")
async def startup_event():
"""서비스 시작 시 모델 로드"""
global voice_processor
print("OpenVoice 모델 로딩 중...")
# 모델 초기화 (별도 클래스로 구현)
voice_processor = VoiceProcessorAPI()
await voice_processor.initialize()
print("OpenVoice API 서비스 준비 완료!")
class VoiceProcessorAPI:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
async def initialize(self):
"""비동기 모델 초기화"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._load_models)
def _load_models(self):
"""모델 로드 (동기 작업)"""
# TTS 모델들
self.tts_models = {}
languages = ['EN', 'KR', 'ZH', 'JP']
for lang in languages:
model_path = f'checkpoints/base_speakers/{lang}'
if os.path.exists(model_path):
tts = BaseSpeakerTTS(f'{model_path}/config.json', device=self.device)
tts.load_ckpt(f'{model_path}/checkpoint.pth')
self.tts_models[lang] = tts
# 컨버터 모델
ckpt_converter = 'checkpoints/converter'
self.tone_color_converter = ToneColorConverter(
f'{ckpt_converter}/config.json',
device=self.device
)
self.tone_color_converter.load_ckpt(f'{ckpt_converter}/checkpoint.pth')
async def clone_voice(self, text: str, reference_audio_path: str,
language: str = "EN", speed: float = 1.0):
"""음성 복제 (비동기)"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self._clone_voice_sync, text, reference_audio_path, language, speed
)
def _clone_voice_sync(self, text: str, reference_audio_path: str,
language: str, speed: float):
"""음성 복제 (동기 작업)"""
try:
# 참조 화자 임베딩
target_se, _ = se_extractor.get_se(
reference_audio_path,
self.tone_color_converter,
vad=True
)
# TTS 생성
temp_path = f'temp_{uuid.uuid4()}.wav'
self.tts_models[language].tts(
text, temp_path, speaker='default',
language=language, speed=speed
)
# 톤 컬러 변환
output_path = f'outputs/{uuid.uuid4()}.wav'
os.makedirs('outputs', exist_ok=True)
self.tone_color_converter.convert(
audio_src_path=temp_path,
src_se=self.tts_models[language].hps.data.spk2id['default'],
tgt_se=target_se,
output_path=output_path
)
# 임시 파일 정리
os.remove(temp_path)
return output_path
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API 엔드포인트들
@app.get("/")
async def root():
"""API 정보"""
return {
"service": "OpenVoice API",
"version": "2.0.0",
"status": "running",
"supported_languages": ["EN", "KR", "ZH", "JP"],
"features": ["voice_cloning", "multilingual_tts", "emotion_control"]
}
@app.get("/health")
async def health_check():
"""헬스 체크"""
return {
"status": "healthy",
"models_loaded": len(voice_processor.tts_models) if voice_processor else 0,
"device": voice_processor.device if voice_processor else "unknown"
}
@app.post("/voice/clone")
async def clone_voice(
request: VoiceCloneRequest,
reference_audio: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
"""음성 복제 API"""
if not voice_processor:
raise HTTPException(status_code=503, detail="모델이 로드되지 않았습니다")
if request.language not in voice_processor.tts_models:
raise HTTPException(status_code=400, detail=f"지원되지 않는 언어: {request.language}")
# 업로드된 파일 저장
reference_path = f"temp_references/{uuid.uuid4()}_{reference_audio.filename}"
os.makedirs("temp_references", exist_ok=True)
async with aiofiles.open(reference_path, 'wb') as f:
content = await reference_audio.read()
await f.write(content)
try:
# 음성 복제 실행
output_path = await voice_processor.clone_voice(
text=request.text,
reference_audio_path=reference_path,
language=request.language,
speed=request.speed
)
# 백그라운드에서 임시 파일 정리
if background_tasks:
background_tasks.add_task(cleanup_file, reference_path)
return FileResponse(
output_path,
media_type="audio/wav",
filename=f"cloned_voice_{uuid.uuid4()}.wav"
)
except Exception as e:
# 오류 시 임시 파일 정리
if os.path.exists(reference_path):
os.remove(reference_path)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/tts/generate")
async def generate_tts(request: TTSRequest):
"""기본 TTS 생성"""
if not voice_processor:
raise HTTPException(status_code=503, detail="모델이 로드되지 않았습니다")
if request.language not in voice_processor.tts_models:
raise HTTPException(status_code=400, detail=f"지원되지 않는 언어: {request.language}")
try:
# TTS 생성
output_path = f'outputs/{uuid.uuid4()}.wav'
os.makedirs('outputs', exist_ok=True)
voice_processor.tts_models[request.language].tts(
request.text,
output_path,
speaker=request.speaker,
language=request.language,
speed=request.speed
)
return FileResponse(
output_path,
media_type="audio/wav",
filename=f"tts_output_{uuid.uuid4()}.wav"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/languages")
async def get_supported_languages():
"""지원 언어 목록"""
if not voice_processor:
return {"languages": []}
return {
"languages": list(voice_processor.tts_models.keys()),
"total": len(voice_processor.tts_models)
}
async def cleanup_file(file_path: str):
"""파일 정리 (백그라운드 작업)"""
await asyncio.sleep(1) # 잠시 대기
if os.path.exists(file_path):
os.remove(file_path)
# 서버 실행 함수
def start_server(host="0.0.0.0", port=8000):
"""API 서버 시작"""
uvicorn.run(
app,
host=host,
port=port,
log_level="info"
)
if __name__ == "__main__":
start_server()
프로덕션 배포
Docker 컨테이너화
# Dockerfile
FROM nvidia/cuda:11.8-devel-ubuntu20.04
# 환경 변수 설정
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
# 시스템 패키지 설치
RUN apt-get update && apt-get install -y \
python3.9 \
python3.9-pip \
python3.9-dev \
git \
wget \
curl \
ffmpeg \
libsndfile1 \
&& rm -rf /var/lib/apt/lists/*
# 심볼릭 링크 생성
RUN ln -s /usr/bin/python3.9 /usr/bin/python
# 작업 디렉토리 설정
WORKDIR /app
# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# PyTorch 설치 (CUDA 지원)
RUN pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu118
# OpenVoice 코드 복사
COPY . .
# 모델 다운로드
RUN mkdir -p checkpoints && \
wget https://myshell-public-repo-hosting.s3.amazonaws.com/openvoice/checkpoints_v2.zip && \
unzip checkpoints_v2.zip -d checkpoints/ && \
rm checkpoints_v2.zip
# 포트 노출
EXPOSE 8000
# 헬스체크
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 서비스 시작
CMD ["python", "web_api_service.py"]
Docker Compose 설정
# docker-compose.yml
version: '3.8'
services:
openvoice-api:
build: .
container_name: openvoice-service
restart: unless-stopped
ports:
- "8000:8000"
volumes:
- ./outputs:/app/outputs
- ./references:/app/references
- ./logs:/app/logs
environment:
- CUDA_VISIBLE_DEVICES=0
- OPENVOICE_LOG_LEVEL=INFO
- OPENVOICE_MAX_WORKERS=4
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
nginx:
image: nginx:alpine
container_name: openvoice-nginx
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- openvoice-api
redis:
image: redis:alpine
container_name: openvoice-redis
restart: unless-stopped
volumes:
- redis_data:/data
command: redis-server --appendonly yes
volumes:
redis_data:
성능 최적화
# performance_optimization.py
import torch
import time
import psutil
import GPUtil
from concurrent.futures import ThreadPoolExecutor
class OpenVoiceOptimizer:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.setup_optimizations()
def setup_optimizations(self):
"""성능 최적화 설정"""
if torch.cuda.is_available():
# CUDA 최적화
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
# 메모리 최적화
torch.cuda.empty_cache()
# 혼합 정밀도 사용
self.use_amp = True
self.scaler = torch.cuda.amp.GradScaler()
print(f"CUDA 최적화 활성화: {torch.cuda.get_device_name(0)}")
else:
self.use_amp = False
print("CPU 모드로 실행")
def benchmark_model_performance(self, model, test_texts, iterations=10):
"""모델 성능 벤치마크"""
print(f"성능 벤치마크 시작 ({iterations}회 반복)")
times = []
memory_usage = []
for i in range(iterations):
start_time = time.time()
# GPU 메모리 사용량 측정
if torch.cuda.is_available():
torch.cuda.synchronize()
mem_before = torch.cuda.memory_allocated()
# 테스트 실행
for text in test_texts:
if self.use_amp:
with torch.cuda.amp.autocast():
_ = model.generate_speech(text)
else:
_ = model.generate_speech(text)
# 실행 시간 기록
end_time = time.time()
execution_time = end_time - start_time
times.append(execution_time)
# 메모리 사용량 기록
if torch.cuda.is_available():
torch.cuda.synchronize()
mem_after = torch.cuda.memory_allocated()
memory_usage.append(mem_after - mem_before)
print(f"반복 {i+1}: {execution_time:.3f}초")
# 통계 계산
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
if memory_usage:
avg_memory = sum(memory_usage) / len(memory_usage) / 1024**2 # MB
else:
avg_memory = 0
print(f"\n=== 성능 벤치마크 결과 ===")
print(f"평균 실행 시간: {avg_time:.3f}초")
print(f"최소 실행 시간: {min_time:.3f}초")
print(f"최대 실행 시간: {max_time:.3f}초")
print(f"평균 메모리 사용량: {avg_memory:.1f}MB")
return {
'avg_time': avg_time,
'min_time': min_time,
'max_time': max_time,
'avg_memory_mb': avg_memory
}
def optimize_for_batch_processing(self, batch_size=8):
"""배치 처리 최적화"""
optimizations = {
'torch_settings': {
'torch.set_num_threads': min(8, psutil.cpu_count()),
'torch.set_grad_enabled': False, # 추론 전용
},
'memory_settings': {
'max_split_size_mb': 128,
'memory_fraction': 0.8
},
'batch_settings': {
'optimal_batch_size': batch_size,
'prefetch_factor': 2
}
}
# 설정 적용
torch.set_num_threads(optimizations['torch_settings']['torch.set_num_threads'])
torch.set_grad_enabled(optimizations['torch_settings']['torch.set_grad_enabled'])
if torch.cuda.is_available():
torch.cuda.set_per_process_memory_fraction(
optimizations['memory_settings']['memory_fraction']
)
return optimizations
def monitor_system_resources(self, duration=60):
"""시스템 리소스 모니터링"""
print(f"시스템 리소스 모니터링 시작 ({duration}초)")
start_time = time.time()
cpu_usage = []
memory_usage = []
gpu_usage = []
while time.time() - start_time < duration:
# CPU 사용률
cpu_percent = psutil.cpu_percent(interval=1)
cpu_usage.append(cpu_percent)
# 메모리 사용률
memory = psutil.virtual_memory()
memory_usage.append(memory.percent)
# GPU 사용률
if torch.cuda.is_available():
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu_usage.append(gpus[0].load * 100)
except:
gpu_usage.append(0)
time.sleep(1)
# 통계 출력
print(f"\n=== 리소스 사용률 ({duration}초 평균) ===")
print(f"CPU: {sum(cpu_usage)/len(cpu_usage):.1f}%")
print(f"메모리: {sum(memory_usage)/len(memory_usage):.1f}%")
if gpu_usage:
print(f"GPU: {sum(gpu_usage)/len(gpu_usage):.1f}%")
return {
'cpu_avg': sum(cpu_usage)/len(cpu_usage),
'memory_avg': sum(memory_usage)/len(memory_usage),
'gpu_avg': sum(gpu_usage)/len(gpu_usage) if gpu_usage else 0
}
def optimization_demo():
"""최적화 데모"""
optimizer = OpenVoiceOptimizer()
# 배치 처리 최적화
batch_optimizations = optimizer.optimize_for_batch_processing(batch_size=4)
print("배치 처리 최적화 적용 완료")
# 테스트 텍스트
test_texts = [
"Hello, this is a performance test.",
"OpenVoice optimization benchmark.",
"Testing voice synthesis speed."
]
# 성능 벤치마크 (실제 모델이 있을 때)
# benchmark_results = optimizer.benchmark_model_performance(model, test_texts)
# 시스템 리소스 모니터링
resource_stats = optimizer.monitor_system_resources(duration=30)
print("최적화 데모 완료!")
if __name__ == "__main__":
optimization_demo()
비즈니스 활용 사례
1. 다국어 콘텐츠 제작
# content_localization.py
class ContentLocalizationPipeline:
def __init__(self):
self.setup_openvoice()
self.supported_languages = {
'EN': 'English',
'ES': 'Spanish',
'FR': 'French',
'ZH': 'Chinese',
'JP': 'Japanese',
'KR': 'Korean'
}
def localize_podcast_series(self, original_audio_path, script_translations, host_voice_sample):
"""팟캐스트 시리즈 다국어 현지화"""
results = {}
for lang_code, translated_script in script_translations.items():
if lang_code not in self.supported_languages:
continue
print(f"현지화 중: {self.supported_languages[lang_code]}")
# 챕터별 분할 처리
chapters = self.split_script_into_chapters(translated_script)
chapter_audios = []
for i, chapter in enumerate(chapters):
chapter_audio = self.generate_chapter_audio(
text=chapter,
language=lang_code,
host_voice=host_voice_sample,
chapter_number=i+1
)
chapter_audios.append(chapter_audio)
# 챕터 결합
final_audio = self.combine_chapters(chapter_audios)
# 후처리 (음질 향상, 노멀라이제이션)
enhanced_audio = self.enhance_audio_quality(final_audio)
output_path = f"localized_podcast_{lang_code}.wav"
self.save_audio(enhanced_audio, output_path)
results[lang_code] = {
'language': self.supported_languages[lang_code],
'output_path': output_path,
'duration': self.get_audio_duration(output_path),
'chapters': len(chapters)
}
return results
# 실제 활용 예제
localization = ContentLocalizationPipeline()
# 원본 팟캐스트와 번역본
script_translations = {
'EN': "Welcome to our tech podcast. Today we'll discuss AI innovations...",
'KR': "저희 기술 팟캐스트에 오신 것을 환영합니다. 오늘은 AI 혁신에 대해...",
'ZH': "欢迎收听我们的技术播客。今天我们将讨论人工智能创新...",
}
# 호스트 음성 샘플
host_voice = "samples/host_voice.wav"
# 다국어 현지화 실행
results = localization.localize_podcast_series(
original_audio_path="original_podcast.wav",
script_translations=script_translations,
host_voice_sample=host_voice
)
print("콘텐츠 현지화 완료:")
for lang, info in results.items():
print(f"- {info['language']}: {info['output_path']} ({info['duration']}초)")
2. 교육 콘텐츠 개인화
# personalized_education.py
class PersonalizedEducationVoice:
def __init__(self):
self.student_profiles = {}
self.teacher_voices = {}
self.setup_voice_models()
def create_student_profile(self, student_id, preferences):
"""학생별 개인화 프로필 생성"""
profile = {
'student_id': student_id,
'preferred_language': preferences.get('language', 'EN'),
'speaking_speed': preferences.get('speed', 1.0),
'voice_style': preferences.get('style', 'neutral'),
'difficulty_level': preferences.get('level', 'intermediate'),
'learning_disabilities': preferences.get('disabilities', [])
}
# 학습 장애 고려 조정
if 'dyslexia' in profile['learning_disabilities']:
profile['speaking_speed'] *= 0.8 # 느리게
if 'attention_deficit' in profile['learning_disabilities']:
profile['voice_style'] = 'engaging' # 더 생동감 있게
self.student_profiles[student_id] = profile
return profile
def generate_personalized_lesson(self, lesson_content, student_id, teacher_voice_sample):
"""개인화된 수업 음성 생성"""
if student_id not in self.student_profiles:
raise ValueError(f"학생 프로필을 찾을 수 없음: {student_id}")
profile = self.student_profiles[student_id]
# 개인화 파라미터 적용
personalized_audio = self.voice_cloner.clone_voice_with_params(
text=lesson_content,
reference_voice=teacher_voice_sample,
language=profile['preferred_language'],
speed=profile['speaking_speed'],
style=profile['voice_style']
)
# 접근성 향상 처리
if 'hearing_impaired' in profile['learning_disabilities']:
personalized_audio = self.enhance_for_hearing_impaired(personalized_audio)
output_path = f"lessons/student_{student_id}_lesson.wav"
self.save_personalized_lesson(personalized_audio, output_path, profile)
return {
'audio_path': output_path,
'student_id': student_id,
'customizations': profile,
'duration': self.get_audio_duration(output_path)
}
# 교육 기관 활용 사례
education_system = PersonalizedEducationVoice()
# 학생별 프로필 생성
students = [
{
'id': 'student_001',
'preferences': {
'language': 'EN',
'speed': 0.9,
'style': 'calm',
'level': 'beginner',
'disabilities': ['dyslexia']
}
},
{
'id': 'student_002',
'preferences': {
'language': 'KR',
'speed': 1.1,
'style': 'enthusiastic',
'level': 'advanced',
'disabilities': []
}
}
]
for student in students:
education_system.create_student_profile(student['id'], student['preferences'])
# 개인화된 수업 생성
lesson_text = "Today we will learn about photosynthesis in plants..."
teacher_voice = "voices/biology_teacher.wav"
for student in students:
result = education_system.generate_personalized_lesson(
lesson_content=lesson_text,
student_id=student['id'],
teacher_voice_sample=teacher_voice
)
print(f"개인화 수업 생성: {result['audio_path']}")
결론
OpenVoice는 음성 기술의 새로운 패러다임을 제시하는 혁신적인 도구입니다. 이 가이드를 통해 다음과 같은 역량을 구축할 수 있습니다:
핵심 성과
- 정확한 음성 복제: 단 몇 초의 참조 음성으로도 고품질 복제
- 다국어 지원: 6개 언어의 네이티브 지원과 제로샷 확장
- 유연한 제어: 감정, 속도, 음높이 등 세밀한 조절
- 프로덕션 준비: API 서비스, 배치 처리, 실시간 변환
기술적 혁신
- MIT 기술력: 세계 최고 수준의 연구 기관 기술
- 오픈소스: 완전한 투명성과 커스터마이징 가능
- 상업적 이용: MIT 라이선스로 제약 없는 활용
- 확장성: 클라우드 배포와 엔터프라이즈 통합
비즈니스 가치
이 기술로 다음과 같은 비즈니스 기회를 창출할 수 있습니다:
- 콘텐츠 제작: 다국어 팟캐스트, 오디오북, 광고
- 교육 서비스: 개인화된 학습 콘텐츠
- 접근성 향상: 시각 장애인을 위한 음성 서비스
- 고객 서비스: 브랜드 목소리로 통일된 AI 상담원
미래 전망
future_developments = {
"real_time_optimization": "실시간 처리 성능 향상",
"emotional_intelligence": "더욱 정교한 감정 표현",
"voice_personalization": "개인별 음성 특성 학습",
"multimodal_integration": "비디오와 음성 동기화",
"edge_deployment": "모바일 디바이스 최적화"
}
OpenVoice의 33.4k GitHub 스타는 단순한 인기가 아닌, 실제 개발자와 기업들이 인정하는 기술적 우수성의 증거입니다. 공식 레포지토리에서 최신 업데이트를 확인하고, 음성 기술의 미래를 선도하는 혁신에 동참해보세요!
관련 리소스: