coqui-tts/TTS/vc/models/openvoice.py

321 lines
13 KiB
Python

import json
import logging
import os
from pathlib import Path
from typing import Any, Mapping, Optional, Union
import librosa
import numpy as np
import numpy.typing as npt
import torch
from coqpit import Coqpit
from torch import nn
from torch.nn import functional as F
from trainer.io import load_fsspec
from TTS.tts.layers.vits.networks import PosteriorEncoder
from TTS.tts.utils.speakers import SpeakerManager
from TTS.utils.audio.torch_transforms import wav_to_spec
from TTS.vc.configs.openvoice_config import OpenVoiceConfig
from TTS.vc.models.base_vc import BaseVC
from TTS.vc.models.freevc import Generator, ResidualCouplingBlock
logger = logging.getLogger(__name__)
class ReferenceEncoder(nn.Module):
"""NN module creating a fixed size prosody embedding from a spectrogram.
inputs: mel spectrograms [batch_size, num_spec_frames, num_mel]
outputs: [batch_size, embedding_dim]
"""
def __init__(self, spec_channels: int, embedding_dim: int = 0, layernorm: bool = True) -> None:
super().__init__()
self.spec_channels = spec_channels
ref_enc_filters = [32, 32, 64, 64, 128, 128]
K = len(ref_enc_filters)
filters = [1] + ref_enc_filters
convs = [
torch.nn.utils.parametrizations.weight_norm(
nn.Conv2d(
in_channels=filters[i],
out_channels=filters[i + 1],
kernel_size=(3, 3),
stride=(2, 2),
padding=(1, 1),
)
)
for i in range(K)
]
self.convs = nn.ModuleList(convs)
out_channels = self.calculate_channels(spec_channels, 3, 2, 1, K)
self.gru = nn.GRU(
input_size=ref_enc_filters[-1] * out_channels,
hidden_size=256 // 2,
batch_first=True,
)
self.proj = nn.Linear(128, embedding_dim)
self.layernorm = nn.LayerNorm(self.spec_channels) if layernorm else None
def forward(self, inputs: torch.Tensor) -> torch.Tensor:
N = inputs.size(0)
out = inputs.view(N, 1, -1, self.spec_channels) # [N, 1, Ty, n_freqs]
if self.layernorm is not None:
out = self.layernorm(out)
for conv in self.convs:
out = conv(out)
out = F.relu(out) # [N, 128, Ty//2^K, n_mels//2^K]
out = out.transpose(1, 2) # [N, Ty//2^K, 128, n_mels//2^K]
T = out.size(1)
N = out.size(0)
out = out.contiguous().view(N, T, -1) # [N, Ty//2^K, 128*n_mels//2^K]
self.gru.flatten_parameters()
_memory, out = self.gru(out) # out --- [1, N, 128]
return self.proj(out.squeeze(0))
def calculate_channels(self, L: int, kernel_size: int, stride: int, pad: int, n_convs: int) -> int:
for _ in range(n_convs):
L = (L - kernel_size + 2 * pad) // stride + 1
return L
class OpenVoice(BaseVC):
"""
OpenVoice voice conversion model (inference only).
Source: https://github.com/myshell-ai/OpenVoice
Paper: https://arxiv.org/abs/2312.01479
Paper abstract:
We introduce OpenVoice, a versatile voice cloning approach that requires
only a short audio clip from the reference speaker to replicate their voice and
generate speech in multiple languages. OpenVoice represents a significant
advancement in addressing the following open challenges in the field: 1)
Flexible Voice Style Control. OpenVoice enables granular control over voice
styles, including emotion, accent, rhythm, pauses, and intonation, in addition
to replicating the tone color of the reference speaker. The voice styles are not
directly copied from and constrained by the style of the reference speaker.
Previous approaches lacked the ability to flexibly manipulate voice styles after
cloning. 2) Zero-Shot Cross-Lingual Voice Cloning. OpenVoice achieves zero-shot
cross-lingual voice cloning for languages not included in the massive-speaker
training set. Unlike previous approaches, which typically require extensive
massive-speaker multi-lingual (MSML) dataset for all languages, OpenVoice can
clone voices into a new language without any massive-speaker training data for
that language. OpenVoice is also computationally efficient, costing tens of
times less than commercially available APIs that offer even inferior
performance. To foster further research in the field, we have made the source
code and trained model publicly accessible. We also provide qualitative results
in our demo website. Prior to its public release, our internal version of
OpenVoice was used tens of millions of times by users worldwide between May and
October 2023, serving as the backend of MyShell.
"""
def __init__(self, config: Coqpit, speaker_manager: Optional[SpeakerManager] = None) -> None:
super().__init__(config, None, speaker_manager, None)
self.init_multispeaker(config)
self.zero_g = self.args.zero_g
self.inter_channels = self.args.inter_channels
self.hidden_channels = self.args.hidden_channels
self.filter_channels = self.args.filter_channels
self.n_heads = self.args.n_heads
self.n_layers = self.args.n_layers
self.kernel_size = self.args.kernel_size
self.p_dropout = self.args.p_dropout
self.resblock = self.args.resblock
self.resblock_kernel_sizes = self.args.resblock_kernel_sizes
self.resblock_dilation_sizes = self.args.resblock_dilation_sizes
self.upsample_rates = self.args.upsample_rates
self.upsample_initial_channel = self.args.upsample_initial_channel
self.upsample_kernel_sizes = self.args.upsample_kernel_sizes
self.n_layers_q = self.args.n_layers_q
self.use_spectral_norm = self.args.use_spectral_norm
self.gin_channels = self.args.gin_channels
self.tau = self.args.tau
self.spec_channels = config.audio.fft_size // 2 + 1
self.dec = Generator(
self.inter_channels,
self.resblock,
self.resblock_kernel_sizes,
self.resblock_dilation_sizes,
self.upsample_rates,
self.upsample_initial_channel,
self.upsample_kernel_sizes,
gin_channels=self.gin_channels,
)
self.enc_q = PosteriorEncoder(
self.spec_channels,
self.inter_channels,
self.hidden_channels,
kernel_size=5,
dilation_rate=1,
num_layers=16,
cond_channels=self.gin_channels,
)
self.flow = ResidualCouplingBlock(
self.inter_channels,
self.hidden_channels,
kernel_size=5,
dilation_rate=1,
n_layers=4,
gin_channels=self.gin_channels,
)
self.ref_enc = ReferenceEncoder(self.spec_channels, self.gin_channels)
@property
def device(self) -> torch.device:
return next(self.parameters()).device
@staticmethod
def init_from_config(config: OpenVoiceConfig) -> "OpenVoice":
return OpenVoice(config)
def init_multispeaker(self, config: Coqpit, data: Optional[list[Any]] = None) -> None:
"""Initialize multi-speaker modules of a model. A model can be trained either with a speaker embedding layer
or with external `d_vectors` computed from a speaker encoder model.
You must provide a `speaker_manager` at initialization to set up the multi-speaker modules.
Args:
config (Coqpit): Model configuration.
data (list, optional): Dataset items to infer number of speakers. Defaults to None.
"""
self.num_spks = config.num_speakers
if self.speaker_manager:
self.num_spks = self.speaker_manager.num_speakers
def load_checkpoint(
self,
config: OpenVoiceConfig,
checkpoint_path: Union[str, os.PathLike[Any]],
eval: bool = False,
strict: bool = True,
cache: bool = False,
) -> None:
"""Map from OpenVoice's config structure."""
config_path = Path(checkpoint_path).parent / "config.json"
with open(config_path, encoding="utf-8") as f:
config_org = json.load(f)
self.config.audio.input_sample_rate = config_org["data"]["sampling_rate"]
self.config.audio.output_sample_rate = config_org["data"]["sampling_rate"]
self.config.audio.fft_size = config_org["data"]["filter_length"]
self.config.audio.hop_length = config_org["data"]["hop_length"]
self.config.audio.win_length = config_org["data"]["win_length"]
state = load_fsspec(str(checkpoint_path), map_location=torch.device("cpu"), cache=cache)
self.load_state_dict(state["model"], strict=strict)
if eval:
self.eval()
def forward(self) -> None: ...
def train_step(self) -> None: ...
def eval_step(self) -> None: ...
@staticmethod
def _set_x_lengths(x: torch.Tensor, aux_input: Mapping[str, Optional[torch.Tensor]]) -> torch.Tensor:
if "x_lengths" in aux_input and aux_input["x_lengths"] is not None:
return aux_input["x_lengths"]
return torch.tensor(x.shape[1:2]).to(x.device)
@torch.no_grad()
def inference(
self,
x: torch.Tensor,
aux_input: Mapping[str, Optional[torch.Tensor]] = {"x_lengths": None, "g_src": None, "g_tgt": None},
) -> dict[str, torch.Tensor]:
"""
Inference pass of the model
Args:
x (torch.Tensor): Input tensor. Shape: (batch_size, c_seq_len).
x_lengths (torch.Tensor): Lengths of the input tensor. Shape: (batch_size,).
g_src (torch.Tensor): Source speaker embedding tensor. Shape: (batch_size, spk_emb_dim).
g_tgt (torch.Tensor): Target speaker embedding tensor. Shape: (batch_size, spk_emb_dim).
Returns:
o_hat: Output spectrogram tensor. Shape: (batch_size, spec_seq_len, spec_dim).
x_mask: Spectrogram mask. Shape: (batch_size, spec_seq_len).
(z, z_p, z_hat): A tuple of latent variables.
"""
x_lengths = self._set_x_lengths(x, aux_input)
if "g_src" in aux_input and aux_input["g_src"] is not None:
g_src = aux_input["g_src"]
else:
raise ValueError("aux_input must define g_src")
if "g_tgt" in aux_input and aux_input["g_tgt"] is not None:
g_tgt = aux_input["g_tgt"]
else:
raise ValueError("aux_input must define g_tgt")
z, _m_q, _logs_q, y_mask = self.enc_q(
x, x_lengths, g=g_src if not self.zero_g else torch.zeros_like(g_src), tau=self.tau
)
z_p = self.flow(z, y_mask, g=g_src)
z_hat = self.flow(z_p, y_mask, g=g_tgt, reverse=True)
o_hat = self.dec(z_hat * y_mask, g=g_tgt if not self.zero_g else torch.zeros_like(g_tgt))
return {
"model_outputs": o_hat,
"y_mask": y_mask,
"z": z,
"z_p": z_p,
"z_hat": z_hat,
}
def load_audio(self, wav: Union[str, npt.NDArray[np.float32], torch.Tensor, list[float]]) -> torch.Tensor:
"""Read and format the input audio."""
if isinstance(wav, str):
out = torch.from_numpy(librosa.load(wav, sr=self.config.audio.input_sample_rate)[0])
elif isinstance(wav, np.ndarray):
out = torch.from_numpy(wav)
elif isinstance(wav, list):
out = torch.from_numpy(np.array(wav))
else:
out = wav
return out.to(self.device).float()
def extract_se(self, audio: Union[str, torch.Tensor]) -> tuple[torch.Tensor, torch.Tensor]:
audio_ref = self.load_audio(audio)
y = torch.FloatTensor(audio_ref)
y = y.to(self.device)
y = y.unsqueeze(0)
spec = wav_to_spec(
y,
n_fft=self.config.audio.fft_size,
hop_length=self.config.audio.hop_length,
win_length=self.config.audio.win_length,
center=False,
).to(self.device)
with torch.no_grad():
g = self.ref_enc(spec.transpose(1, 2)).unsqueeze(-1)
return g, spec
@torch.inference_mode()
def voice_conversion(self, src: Union[str, torch.Tensor], tgt: Union[str, torch.Tensor]) -> npt.NDArray[np.float32]:
"""
Voice conversion pass of the model.
Args:
src (str or torch.Tensor): Source utterance.
tgt (str or torch.Tensor): Target utterance.
Returns:
Output numpy array.
"""
src_se, src_spec = self.extract_se(src)
tgt_se, _ = self.extract_se(tgt)
aux_input = {"g_src": src_se, "g_tgt": tgt_se}
audio = self.inference(src_spec, aux_input)
return audio["model_outputs"][0, 0].data.cpu().float().numpy()