import json import os import random from typing import Any, Dict, List, Tuple, Union import fsspec import numpy as np import torch from coqpit import Coqpit from TTS.config import get_from_config_or_model_args_with_default, load_config from TTS.encoder.utils.generic_utils import setup_speaker_encoder_model from TTS.utils.audio import AudioProcessor class SpeakerManager: """Manage the speakers for multi-speaker 🐸TTS models. Load a datafile and parse the information in a way that can be queried by speaker or clip. There are 3 different scenarios considered: 1. Models using speaker embedding layers. The datafile only maps speaker names to ids used by the embedding layer. 2. Models using d-vectors. The datafile includes a dictionary in the following format. :: { 'clip_name.wav':{ 'name': 'speakerA', 'embedding'[] }, ... } 3. Computing the d-vectors by the speaker encoder. It loads the speaker encoder model and computes the d-vectors for a given clip or speaker. Args: d_vectors_file_path (str, optional): Path to the metafile including x vectors. Defaults to "". speaker_id_file_path (str, optional): Path to the metafile that maps speaker names to ids used by TTS models. Defaults to "". encoder_model_path (str, optional): Path to the speaker encoder model file. Defaults to "". encoder_config_path (str, optional): Path to the spealer encoder config file. Defaults to "". Examples: >>> # load audio processor and speaker encoder >>> ap = AudioProcessor(**config.audio) >>> manager = SpeakerManager(encoder_model_path=encoder_model_path, encoder_config_path=encoder_config_path) >>> # load a sample audio and compute embedding >>> waveform = ap.load_wav(sample_wav_path) >>> mel = ap.melspectrogram(waveform) >>> d_vector = manager.compute_d_vector(mel.T) """ def __init__( self, data_items: List[List[Any]] = None, d_vectors_file_path: str = "", speaker_id_file_path: str = "", encoder_model_path: str = "", encoder_config_path: str = "", use_cuda: bool = False, ): self.d_vectors = {} self.speaker_ids = {} self.d_vectors_by_speakers = {} self.clip_ids = [] self.speaker_encoder = None self.speaker_encoder_ap = None self.use_cuda = use_cuda if data_items: self.speaker_ids, _ = self.parse_speakers_from_data(data_items) if d_vectors_file_path: self.set_d_vectors_from_file(d_vectors_file_path) if speaker_id_file_path: self.set_speaker_ids_from_file(speaker_id_file_path) if encoder_model_path and encoder_config_path: self.init_speaker_encoder(encoder_model_path, encoder_config_path) @staticmethod def _load_json(json_file_path: str) -> Dict: with fsspec.open(json_file_path, "r") as f: return json.load(f) @staticmethod def _save_json(json_file_path: str, data: dict) -> None: with fsspec.open(json_file_path, "w") as f: json.dump(data, f, indent=4) @property def num_speakers(self): return len(self.speaker_ids) @property def speaker_names(self): return list(self.speaker_ids.keys()) @property def d_vector_dim(self): """Dimensionality of d_vectors. If d_vectors are not loaded, returns zero.""" if self.d_vectors: return len(self.d_vectors[list(self.d_vectors.keys())[0]]["embedding"]) return 0 @staticmethod def parse_speakers_from_data(items: list) -> Tuple[Dict, int]: """Parse speaker IDs from data samples retured by `load_tts_samples()`. Args: items (list): Data sampled returned by `load_tts_samples()`. Returns: Tuple[Dict, int]: speaker IDs and number of speakers. """ speakers = sorted({item["speaker_name"] for item in items}) speaker_ids = {name: i for i, name in enumerate(speakers)} num_speakers = len(speaker_ids) return speaker_ids, num_speakers def set_speaker_ids_from_data(self, items: List) -> None: """Set speaker IDs from data samples. Args: items (List): Data sampled returned by `load_tts_samples()`. """ self.speaker_ids, _ = self.parse_speakers_from_data(items) def set_speaker_ids_from_file(self, file_path: str) -> None: """Set speaker IDs from a file. Args: file_path (str): Path to the file. """ self.speaker_ids = self._load_json(file_path) def save_speaker_ids_to_file(self, file_path: str) -> None: """Save speaker IDs to a json file. Args: file_path (str): Path to the output file. """ self._save_json(file_path, self.speaker_ids) def save_d_vectors_to_file(self, file_path: str) -> None: """Save d_vectors to a json file. Args: file_path (str): Path to the output file. """ self._save_json(file_path, self.d_vectors) def set_d_vectors_from_file(self, file_path: str) -> None: """Load d_vectors from a json file. Args: file_path (str): Path to the target json file. """ self.d_vectors = self._load_json(file_path) speakers = sorted({x["name"] for x in self.d_vectors.values()}) self.speaker_ids = {name: i for i, name in enumerate(speakers)} self.clip_ids = list(set(sorted(clip_name for clip_name in self.d_vectors.keys()))) # cache d_vectors_by_speakers for fast inference using a bigger speakers.json self.d_vectors_by_speakers = self.get_d_vectors_by_speakers() def get_d_vector_by_clip(self, clip_idx: str) -> List: """Get d_vector by clip ID. Args: clip_idx (str): Target clip ID. Returns: List: d_vector as a list. """ return self.d_vectors[clip_idx]["embedding"] def get_d_vectors_by_speaker(self, speaker_idx: str) -> List[List]: """Get all d_vectors of a speaker. Args: speaker_idx (str): Target speaker ID. Returns: List[List]: all the d_vectors of the given speaker. """ return self.d_vectors_by_speakers[speaker_idx] def get_d_vectors_by_speakers(self) -> Dict: """Get all d_vectors by speaker. Returns: Dict: all the d_vectors of each speaker. """ d_vectors_by_speakers = {} for x in self.d_vectors.values(): if x["name"] not in d_vectors_by_speakers.keys(): d_vectors_by_speakers[x["name"]] = [x["embedding"]] else: d_vectors_by_speakers[x["name"]].append(x["embedding"]) return d_vectors_by_speakers def get_mean_d_vector(self, speaker_idx: str, num_samples: int = None, randomize: bool = False) -> np.ndarray: """Get mean d_vector of a speaker ID. Args: speaker_idx (str): Target speaker ID. num_samples (int, optional): Number of samples to be averaged. Defaults to None. randomize (bool, optional): Pick random `num_samples` of d_vectors. Defaults to False. Returns: np.ndarray: Mean d_vector. """ d_vectors = self.get_d_vectors_by_speaker(speaker_idx) if num_samples is None: d_vectors = np.stack(d_vectors).mean(0) else: assert len(d_vectors) >= num_samples, f" [!] speaker {speaker_idx} has number of samples < {num_samples}" if randomize: d_vectors = np.stack(random.choices(d_vectors, k=num_samples)).mean(0) else: d_vectors = np.stack(d_vectors[:num_samples]).mean(0) return d_vectors def get_random_speaker_id(self) -> Any: """Get a random d_vector. Args: Returns: np.ndarray: d_vector. """ if self.speaker_ids: return self.speaker_ids[random.choices(list(self.speaker_ids.keys()))[0]] return None def get_random_d_vector(self) -> Any: """Get a random D ID. Args: Returns: np.ndarray: d_vector. """ if self.d_vectors: return self.d_vectors[random.choices(list(self.d_vectors.keys()))[0]]["embedding"] return None def get_speakers(self) -> List: return self.speaker_ids def get_clips(self) -> List: return sorted(self.d_vectors.keys()) def init_speaker_encoder(self, model_path: str, config_path: str) -> None: """Initialize a speaker encoder model. Args: model_path (str): Model file path. config_path (str): Model config file path. """ self.speaker_encoder_config = load_config(config_path) self.speaker_encoder = setup_speaker_encoder_model(self.speaker_encoder_config) self.speaker_encoder_criterion = self.speaker_encoder.load_checkpoint(self.speaker_encoder_config, model_path, eval=True, use_cuda=self.use_cuda) self.speaker_encoder_ap = AudioProcessor(**self.speaker_encoder_config.audio) def compute_d_vector_from_clip(self, wav_file: Union[str, List[str]]) -> list: """Compute a d_vector from a given audio file. Args: wav_file (Union[str, List[str]]): Target file path. Returns: list: Computed d_vector. """ def _compute(wav_file: str): waveform = self.speaker_encoder_ap.load_wav(wav_file, sr=self.speaker_encoder_ap.sample_rate) if not self.speaker_encoder_config.model_params.get("use_torch_spec", False): m_input = self.speaker_encoder_ap.melspectrogram(waveform) m_input = torch.from_numpy(m_input) else: m_input = torch.from_numpy(waveform) if self.use_cuda: m_input = m_input.cuda() m_input = m_input.unsqueeze(0) d_vector = self.speaker_encoder.compute_embedding(m_input) return d_vector if isinstance(wav_file, list): # compute the mean d_vector d_vectors = None for wf in wav_file: d_vector = _compute(wf) if d_vectors is None: d_vectors = d_vector else: d_vectors += d_vector return (d_vectors / len(wav_file))[0].tolist() d_vector = _compute(wav_file) return d_vector[0].tolist() def compute_d_vector(self, feats: Union[torch.Tensor, np.ndarray]) -> List: """Compute d_vector from features. Args: feats (Union[torch.Tensor, np.ndarray]): Input features. Returns: List: computed d_vector. """ if isinstance(feats, np.ndarray): feats = torch.from_numpy(feats) if feats.ndim == 2: feats = feats.unsqueeze(0) if self.use_cuda: feats = feats.cuda() return self.speaker_encoder.compute_embedding(feats) def run_umap(self): # TODO: implement speaker encoder raise NotImplementedError def plot_embeddings(self): # TODO: implement speaker encoder raise NotImplementedError @staticmethod def init_from_config(config: "Coqpit", samples: Union[List[List], List[Dict]] = None) -> "SpeakerManager": """Initialize a speaker manager from config Args: config (Coqpit): Config object. samples (Union[List[List], List[Dict]], optional): List of data samples to parse out the speaker names. Defaults to None. Returns: SpeakerEncoder: Speaker encoder object. """ speaker_manager = None if get_from_config_or_model_args_with_default(config, "use_speaker_embedding", False): if samples: speaker_manager = SpeakerManager(data_items=samples) if get_from_config_or_model_args_with_default(config, "speaker_file", None): speaker_manager = SpeakerManager( speaker_id_file_path=get_from_config_or_model_args_with_default(config, "speaker_file", None) ) if get_from_config_or_model_args_with_default(config, "speakers_file", None): speaker_manager = SpeakerManager( speaker_id_file_path=get_from_config_or_model_args_with_default(config, "speakers_file", None) ) if get_from_config_or_model_args_with_default(config, "use_d_vector_file", False): if get_from_config_or_model_args_with_default(config, "speakers_file", None): speaker_manager = SpeakerManager( d_vectors_file_path=get_from_config_or_model_args_with_default(config, "speaker_file", None) ) if get_from_config_or_model_args_with_default(config, "d_vector_file", None): speaker_manager = SpeakerManager( d_vectors_file_path=get_from_config_or_model_args_with_default(config, "d_vector_file", None) ) return speaker_manager def _set_file_path(path): """Find the speakers.json under the given path or the above it. Intended to band aid the different paths returned in restored and continued training.""" path_restore = os.path.join(os.path.dirname(path), "speakers.json") path_continue = os.path.join(path, "speakers.json") fs = fsspec.get_mapper(path).fs if fs.exists(path_restore): return path_restore if fs.exists(path_continue): return path_continue raise FileNotFoundError(f" [!] `speakers.json` not found in {path}") def load_speaker_mapping(out_path): """Loads speaker mapping if already present.""" if os.path.splitext(out_path)[1] == ".json": json_file = out_path else: json_file = _set_file_path(out_path) with fsspec.open(json_file, "r") as f: return json.load(f) def save_speaker_mapping(out_path, speaker_mapping): """Saves speaker mapping if not yet present.""" if out_path is not None: speakers_json_path = _set_file_path(out_path) with fsspec.open(speakers_json_path, "w") as f: json.dump(speaker_mapping, f, indent=4) def get_speaker_manager(c: Coqpit, data: List = None, restore_path: str = None, out_path: str = None) -> SpeakerManager: """Initiate a `SpeakerManager` instance by the provided config. Args: c (Coqpit): Model configuration. restore_path (str): Path to a previous training folder. data (List): Data samples used in training to infer speakers from. It must be provided if speaker embedding layers is used. Defaults to None. out_path (str, optional): Save the generated speaker IDs to a output path. Defaults to None. Returns: SpeakerManager: initialized and ready to use instance. """ speaker_manager = SpeakerManager() if c.use_speaker_embedding: if data is not None: speaker_manager.set_speaker_ids_from_data(data) if restore_path: speakers_file = _set_file_path(restore_path) # restoring speaker manager from a previous run. if c.use_d_vector_file: # restore speaker manager with the embedding file if not os.path.exists(speakers_file): print("WARNING: speakers.json was not found in restore_path, trying to use CONFIG.d_vector_file") if not os.path.exists(c.d_vector_file): raise RuntimeError( "You must copy the file speakers.json to restore_path, or set a valid file in CONFIG.d_vector_file" ) speaker_manager.load_d_vectors_file(c.d_vector_file) speaker_manager.set_d_vectors_from_file(speakers_file) elif not c.use_d_vector_file: # restor speaker manager with speaker ID file. speaker_ids_from_data = speaker_manager.speaker_ids speaker_manager.set_speaker_ids_from_file(speakers_file) assert all( speaker in speaker_manager.speaker_ids for speaker in speaker_ids_from_data ), " [!] You cannot introduce new speakers to a pre-trained model." elif c.use_d_vector_file and c.d_vector_file: # new speaker manager with external speaker embeddings. speaker_manager.set_d_vectors_from_file(c.d_vector_file) elif c.use_d_vector_file and not c.d_vector_file: raise "use_d_vector_file is True, so you need pass a external speaker embedding file." elif c.use_speaker_embedding and "speakers_file" in c and c.speakers_file: # new speaker manager with speaker IDs file. speaker_manager.set_speaker_ids_from_file(c.speakers_file) if speaker_manager.num_speakers > 0: print( " > Speaker manager is loaded with {} speakers: {}".format( speaker_manager.num_speakers, ", ".join(speaker_manager.speaker_ids) ) ) # save file if path is defined if out_path: out_file_path = os.path.join(out_path, "speakers.json") print(f" > Saving `speakers.json` to {out_file_path}.") if c.use_d_vector_file and c.d_vector_file: speaker_manager.save_d_vectors_to_file(out_file_path) else: speaker_manager.save_speaker_ids_to_file(out_file_path) return speaker_manager def get_speaker_balancer_weights(items: list): speaker_names = np.array([item["speaker_name"] for item in items]) unique_speaker_names = np.unique(speaker_names).tolist() speaker_ids = [unique_speaker_names.index(l) for l in speaker_names] speaker_count = np.array([len(np.where(speaker_names == l)[0]) for l in unique_speaker_names]) weight_speaker = 1.0 / speaker_count dataset_samples_weight = np.array([weight_speaker[l] for l in speaker_ids]) # normalize dataset_samples_weight = dataset_samples_weight / np.linalg.norm(dataset_samples_weight) return torch.from_numpy(dataset_samples_weight).float()