Add compute_embeddings and resample_files functions to be able to reuse it

This commit is contained in:
Edresson Casanova 2022-12-09 18:06:01 -03:00
parent 5d925eaa4d
commit 0bf274688f
2 changed files with 160 additions and 126 deletions

View File

@ -11,121 +11,151 @@ from TTS.tts.datasets import load_tts_samples
from TTS.tts.utils.managers import save_file
from TTS.tts.utils.speakers import SpeakerManager
parser = argparse.ArgumentParser(
description="""Compute embedding vectors for each audio file in a dataset and store them keyed by `{dataset_name}#{file_path}` in a .pth file\n\n"""
"""
Example runs:
python TTS/bin/compute_embeddings.py --model_path speaker_encoder_model.pth --config_path speaker_encoder_config.json --config_dataset_path dataset_config.json
python TTS/bin/compute_embeddings.py --model_path speaker_encoder_model.pth --config_path speaker_encoder_config.json --fomatter vctk --dataset_path /path/to/vctk/dataset --dataset_name my_vctk --metafile /path/to/vctk/metafile.csv
""",
formatter_class=RawTextHelpFormatter,
)
parser.add_argument(
"--model_path",
type=str,
help="Path to model checkpoint file. It defaults to the released speaker encoder.",
default="https://github.com/coqui-ai/TTS/releases/download/speaker_encoder_model/model_se.pth.tar",
)
parser.add_argument(
"--config_path",
type=str,
help="Path to model config file. It defaults to the released speaker encoder config.",
default="https://github.com/coqui-ai/TTS/releases/download/speaker_encoder_model/config_se.json",
)
parser.add_argument(
"--config_dataset_path",
type=str,
help="Path to dataset config file. You either need to provide this or `formatter_name`, `dataset_name` and `dataset_path` arguments.",
default=None,
)
parser.add_argument("--output_path", type=str, help="Path for output `pth` or `json` file.", default="speakers.pth")
parser.add_argument("--old_file", type=str, help="Previous embedding file to only compute new audios.", default=None)
parser.add_argument("--disable_cuda", type=bool, help="Flag to disable cuda.", default=False)
parser.add_argument("--no_eval", type=bool, help="Do not compute eval?. Default False", default=False)
parser.add_argument(
"--formatter_name",
type=str,
help="Name of the formatter to use. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--dataset_name",
type=str,
help="Name of the dataset to use. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--dataset_path",
type=str,
help="Path to the dataset. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--metafile",
type=str,
help="Path to the meta file. If not set, dataset formatter uses the default metafile if it is defined in the formatter. You either need to provide this or `config_dataset_path`",
default=None,
)
args = parser.parse_args()
def compute_embeddings(
model_path,
config_path,
output_path,
old_spakers_file=None,
config_dataset_path=None,
formatter_name=None,
dataset_name=None,
dataset_path=None,
meta_file_train=None,
disable_cuda=False,
no_eval=False,
):
use_cuda = torch.cuda.is_available() and not disable_cuda
use_cuda = torch.cuda.is_available() and not args.disable_cuda
if args.config_dataset_path is not None:
c_dataset = load_config(args.config_dataset_path)
meta_data_train, meta_data_eval = load_tts_samples(c_dataset.datasets, eval_split=not args.no_eval)
else:
c_dataset = BaseDatasetConfig()
c_dataset.formatter = args.formatter_name
c_dataset.dataset_name = args.dataset_name
c_dataset.path = args.dataset_path
c_dataset.meta_file_train = args.metafile if args.metafile else None
meta_data_train, meta_data_eval = load_tts_samples(c_dataset, eval_split=not args.no_eval)
if meta_data_eval is None:
samples = meta_data_train
else:
samples = meta_data_train + meta_data_eval
encoder_manager = SpeakerManager(
encoder_model_path=args.model_path,
encoder_config_path=args.config_path,
d_vectors_file_path=args.old_file,
use_cuda=use_cuda,
)
class_name_key = encoder_manager.encoder_config.class_name_key
# compute speaker embeddings
speaker_mapping = {}
for idx, fields in enumerate(tqdm(samples)):
class_name = fields[class_name_key]
audio_file = fields["audio_file"]
embedding_key = fields["audio_unique_name"]
root_path = fields["root_path"]
if args.old_file is not None and embedding_key in encoder_manager.clip_ids:
# get the embedding from the old file
embedd = encoder_manager.get_embedding_by_clip(embedding_key)
if config_dataset_path is not None:
c_dataset = load_config(config_dataset_path)
meta_data_train, meta_data_eval = load_tts_samples(c_dataset.datasets, eval_split=not no_eval)
else:
# extract the embedding
embedd = encoder_manager.compute_embedding_from_clip(audio_file)
c_dataset = BaseDatasetConfig()
c_dataset.formatter = formatter_name
c_dataset.dataset_name = dataset_name
c_dataset.path = dataset_path
c_dataset.meta_file_train = meta_file_train if meta_file_train else None
meta_data_train, meta_data_eval = load_tts_samples(c_dataset, eval_split=not no_eval)
# create speaker_mapping if target dataset is defined
speaker_mapping[embedding_key] = {}
speaker_mapping[embedding_key]["name"] = class_name
speaker_mapping[embedding_key]["embedding"] = embedd
if speaker_mapping:
# save speaker_mapping if target dataset is defined
if os.path.isdir(args.output_path):
mapping_file_path = os.path.join(args.output_path, "speakers.pth")
if meta_data_eval is None:
samples = meta_data_train
else:
mapping_file_path = args.output_path
samples = meta_data_train + meta_data_eval
if os.path.dirname(mapping_file_path) != "":
os.makedirs(os.path.dirname(mapping_file_path), exist_ok=True)
encoder_manager = SpeakerManager(
encoder_model_path=model_path,
encoder_config_path=config_path,
d_vectors_file_path=old_spakers_file,
use_cuda=use_cuda,
)
save_file(speaker_mapping, mapping_file_path)
print("Speaker embeddings saved at:", mapping_file_path)
class_name_key = encoder_manager.encoder_config.class_name_key
# compute speaker embeddings
speaker_mapping = {}
for fields in tqdm(samples):
class_name = fields[class_name_key]
audio_file = fields["audio_file"]
embedding_key = fields["audio_unique_name"]
if old_spakers_file is not None and embedding_key in encoder_manager.clip_ids:
# get the embedding from the old file
embedd = encoder_manager.get_embedding_by_clip(embedding_key)
else:
# extract the embedding
embedd = encoder_manager.compute_embedding_from_clip(audio_file)
# create speaker_mapping if target dataset is defined
speaker_mapping[embedding_key] = {}
speaker_mapping[embedding_key]["name"] = class_name
speaker_mapping[embedding_key]["embedding"] = embedd
if speaker_mapping:
# save speaker_mapping if target dataset is defined
if os.path.isdir(output_path):
mapping_file_path = os.path.join(output_path, "speakers.pth")
else:
mapping_file_path = output_path
if os.path.dirname(mapping_file_path) != "":
os.makedirs(os.path.dirname(mapping_file_path), exist_ok=True)
save_file(speaker_mapping, mapping_file_path)
print("Speaker embeddings saved at:", mapping_file_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Compute embedding vectors for each audio file in a dataset and store them keyed by `{dataset_name}#{file_path}` in a .pth file\n\n"""
"""
Example runs:
python TTS/bin/compute_embeddings.py --model_path speaker_encoder_model.pth --config_path speaker_encoder_config.json --config_dataset_path dataset_config.json
python TTS/bin/compute_embeddings.py --model_path speaker_encoder_model.pth --config_path speaker_encoder_config.json --fomatter vctk --dataset_path /path/to/vctk/dataset --dataset_name my_vctk --metafile /path/to/vctk/metafile.csv
""",
formatter_class=RawTextHelpFormatter,
)
parser.add_argument(
"--model_path",
type=str,
help="Path to model checkpoint file. It defaults to the released speaker encoder.",
default="https://github.com/coqui-ai/TTS/releases/download/speaker_encoder_model/model_se.pth.tar",
)
parser.add_argument(
"--config_path",
type=str,
help="Path to model config file. It defaults to the released speaker encoder config.",
default="https://github.com/coqui-ai/TTS/releases/download/speaker_encoder_model/config_se.json",
)
parser.add_argument(
"--config_dataset_path",
type=str,
help="Path to dataset config file. You either need to provide this or `formatter_name`, `dataset_name` and `dataset_path` arguments.",
default=None,
)
parser.add_argument("--output_path", type=str, help="Path for output `pth` or `json` file.", default="speakers.pth")
parser.add_argument(
"--old_file", type=str, help="Previous embedding file to only compute new audios.", default=None
)
parser.add_argument("--disable_cuda", type=bool, help="Flag to disable cuda.", default=False)
parser.add_argument("--no_eval", type=bool, help="Do not compute eval?. Default False", default=False)
parser.add_argument(
"--formatter_name",
type=str,
help="Name of the formatter to use. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--dataset_name",
type=str,
help="Name of the dataset to use. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--dataset_path",
type=str,
help="Path to the dataset. You either need to provide this or `config_dataset_path`",
default=None,
)
parser.add_argument(
"--metafile",
type=str,
help="Path to the meta file. If not set, dataset formatter uses the default metafile if it is defined in the formatter. You either need to provide this or `config_dataset_path`",
default=None,
)
args = parser.parse_args()
compute_embeddings(
args.model_path,
args.config_path,
args.output_path,
old_spakers_file=args.old_file,
config_dataset_path=args.config_dataset_path,
formatter_name=args.formatter_name,
dataset_name=args.dataset_name,
dataset_path=args.dataset_path,
meta_file_train=args.metafile,
disable_cuda=args.disable_cuda,
no_eval=args.no_eval,
)

View File

@ -16,6 +16,24 @@ def resample_file(func_args):
sf.write(filename, y, sr)
def resample_files(input_dir, output_sr, output_dir=None, file_ext="wav", n_jobs=10):
if output_dir:
print("Recursively copying the input folder...")
copy_tree(input_dir, output_dir)
input_dir = output_dir
print("Resampling the audio files...")
audio_files = glob.glob(os.path.join(input_dir, f"**/*.{file_ext}"), recursive=True)
print(f"Found {len(audio_files)} files...")
audio_files = list(zip(audio_files, len(audio_files) * [output_sr]))
with Pool(processes=n_jobs) as p:
with tqdm(total=len(audio_files)) as pbar:
for _, _ in enumerate(p.imap_unordered(resample_file, audio_files)):
pbar.update()
print("Done !")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
@ -70,18 +88,4 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.output_dir:
print("Recursively copying the input folder...")
copy_tree(args.input_dir, args.output_dir)
args.input_dir = args.output_dir
print("Resampling the audio files...")
audio_files = glob.glob(os.path.join(args.input_dir, f"**/*.{args.file_ext}"), recursive=True)
print(f"Found {len(audio_files)} files...")
audio_files = list(zip(audio_files, len(audio_files) * [args.output_sr]))
with Pool(processes=args.n_jobs) as p:
with tqdm(total=len(audio_files)) as pbar:
for i, _ in enumerate(p.imap_unordered(resample_file, audio_files)):
pbar.update()
print("Done !")
resample_files(args.input_dir, args.output_sr, args.output_dir, args.file_ext, args.n_jobs)