mirror of https://github.com/coqui-ai/TTS.git
wavernn stuff...
This commit is contained in:
parent
6378fa2b07
commit
9c3c7ce2f8
|
@ -0,0 +1,96 @@
|
|||
import os
|
||||
import glob
|
||||
import torch
|
||||
import numpy as np
|
||||
from torch.utils.data import Dataset
|
||||
|
||||
|
||||
class WaveRNNDataset(Dataset):
|
||||
"""
|
||||
WaveRNN Dataset searchs for all the wav files under root path
|
||||
and converts them to acoustic features on the fly.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ap,
|
||||
items,
|
||||
seq_len,
|
||||
hop_len,
|
||||
pad,
|
||||
mode,
|
||||
is_training=True,
|
||||
return_segments=True,
|
||||
use_cache=False,
|
||||
verbose=False,
|
||||
):
|
||||
|
||||
self.ap = ap
|
||||
self.item_list = items
|
||||
self.seq_len = seq_len
|
||||
self.hop_len = hop_len
|
||||
self.pad = pad
|
||||
self.mode = mode
|
||||
self.is_training = is_training
|
||||
self.return_segments = return_segments
|
||||
self.use_cache = use_cache
|
||||
self.verbose = verbose
|
||||
|
||||
# wav_files = [f"{self.path}wavs/{file}.wav" for file in self.metadata]
|
||||
# with Pool(4) as pool:
|
||||
# self.wav_cache = pool.map(self.ap.load_wav, wav_files)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.item_list)
|
||||
|
||||
def __getitem__(self, index):
|
||||
item = self.load_item(index)
|
||||
return item
|
||||
|
||||
def load_item(self, index):
|
||||
wavpath, feat_path = self.item_list[index]
|
||||
m = np.load(feat_path.replace("/quant/", "/mel/"))
|
||||
# x = self.wav_cache[index]
|
||||
if 5 > m.shape[-1]:
|
||||
print(" [!] Instance is too short! : {}".format(wavpath))
|
||||
self.item_list[index] = self.item_list[index + 1]
|
||||
feat_path = self.item_list[index]
|
||||
m = np.load(feat_path.replace("/quant/", "/mel/"))
|
||||
if self.mode in ["gauss", "mold"]:
|
||||
x = self.ap.load_wav(wavpath)
|
||||
elif isinstance(self.mode, int):
|
||||
x = np.load(feat_path.replace("/mel/", "/quant/"))
|
||||
else:
|
||||
raise RuntimeError("Unknown dataset mode - ", self.mode)
|
||||
return m, x
|
||||
|
||||
def collate(self, batch):
|
||||
mel_win = self.seq_len // self.hop_len + 2 * self.pad
|
||||
max_offsets = [x[0].shape[-1] - (mel_win + 2 * self.pad) for x in batch]
|
||||
mel_offsets = [np.random.randint(0, offset) for offset in max_offsets]
|
||||
sig_offsets = [(offset + self.pad) * self.hop_len for offset in mel_offsets]
|
||||
|
||||
mels = [
|
||||
x[0][:, mel_offsets[i] : mel_offsets[i] + mel_win]
|
||||
for i, x in enumerate(batch)
|
||||
]
|
||||
|
||||
coarse = [
|
||||
x[1][sig_offsets[i] : sig_offsets[i] + self.seq_len + 1]
|
||||
for i, x in enumerate(batch)
|
||||
]
|
||||
|
||||
mels = np.stack(mels).astype(np.float32)
|
||||
if self.mode in ["gauss", "mold"]:
|
||||
coarse = np.stack(coarse).astype(np.float32)
|
||||
coarse = torch.FloatTensor(coarse)
|
||||
x_input = coarse[:, : self.seq_len]
|
||||
elif isinstance(self.mode, int):
|
||||
coarse = np.stack(coarse).astype(np.int64)
|
||||
coarse = torch.LongTensor(coarse)
|
||||
x_input = (
|
||||
2 * coarse[:, : self.seq_len].float() / (2 ** self.mode - 1.0) - 1.0
|
||||
)
|
||||
y_coarse = coarse[:, 1:]
|
||||
mels = torch.FloatTensor(mels)
|
||||
return x_input, mels, y_coarse
|
|
@ -0,0 +1,485 @@
|
|||
import sys
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import numpy as np
|
||||
import torch.nn.functional as F
|
||||
import time
|
||||
|
||||
# fix this
|
||||
from TTS.utils.audio import AudioProcessor as ap
|
||||
from TTS.vocoder.utils.distribution import (
|
||||
sample_from_gaussian,
|
||||
sample_from_discretized_mix_logistic,
|
||||
)
|
||||
|
||||
|
||||
def stream(string, variables):
|
||||
sys.stdout.write(f"\r{string}" % variables)
|
||||
|
||||
|
||||
class ResBlock(nn.Module):
|
||||
def __init__(self, dims):
|
||||
super().__init__()
|
||||
self.conv1 = nn.Conv1d(dims, dims, kernel_size=1, bias=False)
|
||||
self.conv2 = nn.Conv1d(dims, dims, kernel_size=1, bias=False)
|
||||
self.batch_norm1 = nn.BatchNorm1d(dims)
|
||||
self.batch_norm2 = nn.BatchNorm1d(dims)
|
||||
|
||||
def forward(self, x):
|
||||
residual = x
|
||||
x = self.conv1(x)
|
||||
x = self.batch_norm1(x)
|
||||
x = F.relu(x)
|
||||
x = self.conv2(x)
|
||||
x = self.batch_norm2(x)
|
||||
return x + residual
|
||||
|
||||
|
||||
class MelResNet(nn.Module):
|
||||
def __init__(self, res_blocks, in_dims, compute_dims, res_out_dims, pad):
|
||||
super().__init__()
|
||||
k_size = pad * 2 + 1
|
||||
self.conv_in = nn.Conv1d(in_dims, compute_dims, kernel_size=k_size, bias=False)
|
||||
self.batch_norm = nn.BatchNorm1d(compute_dims)
|
||||
self.layers = nn.ModuleList()
|
||||
for i in range(res_blocks):
|
||||
self.layers.append(ResBlock(compute_dims))
|
||||
self.conv_out = nn.Conv1d(compute_dims, res_out_dims, kernel_size=1)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv_in(x)
|
||||
x = self.batch_norm(x)
|
||||
x = F.relu(x)
|
||||
for f in self.layers:
|
||||
x = f(x)
|
||||
x = self.conv_out(x)
|
||||
return x
|
||||
|
||||
|
||||
class Stretch2d(nn.Module):
|
||||
def __init__(self, x_scale, y_scale):
|
||||
super().__init__()
|
||||
self.x_scale = x_scale
|
||||
self.y_scale = y_scale
|
||||
|
||||
def forward(self, x):
|
||||
b, c, h, w = x.size()
|
||||
x = x.unsqueeze(-1).unsqueeze(3)
|
||||
x = x.repeat(1, 1, 1, self.y_scale, 1, self.x_scale)
|
||||
return x.view(b, c, h * self.y_scale, w * self.x_scale)
|
||||
|
||||
|
||||
class UpsampleNetwork(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
feat_dims,
|
||||
upsample_scales,
|
||||
compute_dims,
|
||||
res_blocks,
|
||||
res_out_dims,
|
||||
pad,
|
||||
use_aux_net,
|
||||
):
|
||||
super().__init__()
|
||||
self.total_scale = np.cumproduct(upsample_scales)[-1]
|
||||
self.indent = pad * self.total_scale
|
||||
self.use_aux_net = use_aux_net
|
||||
if use_aux_net:
|
||||
self.resnet = MelResNet(
|
||||
res_blocks, feat_dims, compute_dims, res_out_dims, pad
|
||||
)
|
||||
self.resnet_stretch = Stretch2d(self.total_scale, 1)
|
||||
self.up_layers = nn.ModuleList()
|
||||
for scale in upsample_scales:
|
||||
k_size = (1, scale * 2 + 1)
|
||||
padding = (0, scale)
|
||||
stretch = Stretch2d(scale, 1)
|
||||
conv = nn.Conv2d(1, 1, kernel_size=k_size, padding=padding, bias=False)
|
||||
conv.weight.data.fill_(1.0 / k_size[1])
|
||||
self.up_layers.append(stretch)
|
||||
self.up_layers.append(conv)
|
||||
|
||||
def forward(self, m):
|
||||
if self.use_aux_net:
|
||||
aux = self.resnet(m).unsqueeze(1)
|
||||
aux = self.resnet_stretch(aux)
|
||||
aux = aux.squeeze(1)
|
||||
aux = aux.transpose(1, 2)
|
||||
else:
|
||||
aux = None
|
||||
m = m.unsqueeze(1)
|
||||
for f in self.up_layers:
|
||||
m = f(m)
|
||||
m = m.squeeze(1)[:, :, self.indent : -self.indent]
|
||||
return m.transpose(1, 2), aux
|
||||
|
||||
|
||||
class Upsample(nn.Module):
|
||||
def __init__(
|
||||
self, scale, pad, res_blocks, feat_dims, compute_dims, res_out_dims, use_aux_net
|
||||
):
|
||||
super().__init__()
|
||||
self.scale = scale
|
||||
self.pad = pad
|
||||
self.indent = pad * scale
|
||||
self.use_aux_net = use_aux_net
|
||||
self.resnet = MelResNet(res_blocks, feat_dims, compute_dims, res_out_dims, pad)
|
||||
|
||||
def forward(self, m):
|
||||
if self.use_aux_net:
|
||||
aux = self.resnet(m)
|
||||
aux = torch.nn.functional.interpolate(
|
||||
aux, scale_factor=self.scale, mode="linear", align_corners=True
|
||||
)
|
||||
aux = aux.transpose(1, 2)
|
||||
else:
|
||||
aux = None
|
||||
m = torch.nn.functional.interpolate(
|
||||
m, scale_factor=self.scale, mode="linear", align_corners=True
|
||||
)
|
||||
m = m[:, :, self.indent : -self.indent]
|
||||
m = m * 0.045 # empirically found
|
||||
|
||||
return m.transpose(1, 2), aux
|
||||
|
||||
|
||||
class WaveRNN(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
rnn_dims,
|
||||
fc_dims,
|
||||
mode,
|
||||
mulaw,
|
||||
pad,
|
||||
use_aux_net,
|
||||
use_upsample_net,
|
||||
upsample_factors,
|
||||
feat_dims,
|
||||
compute_dims,
|
||||
res_out_dims,
|
||||
res_blocks,
|
||||
hop_length,
|
||||
sample_rate,
|
||||
):
|
||||
super().__init__()
|
||||
self.mode = mode
|
||||
self.mulaw = mulaw
|
||||
self.pad = pad
|
||||
self.use_upsample_net = use_upsample_net
|
||||
self.use_aux_net = use_aux_net
|
||||
if isinstance(self.mode, int):
|
||||
self.n_classes = 2 ** self.mode
|
||||
elif self.mode == "mold":
|
||||
self.n_classes = 3 * 10
|
||||
elif self.mode == "gauss":
|
||||
self.n_classes = 2
|
||||
else:
|
||||
raise RuntimeError(" > Unknown training mode")
|
||||
|
||||
self.rnn_dims = rnn_dims
|
||||
self.aux_dims = res_out_dims // 4
|
||||
self.hop_length = hop_length
|
||||
self.sample_rate = sample_rate
|
||||
|
||||
if self.use_upsample_net:
|
||||
assert (
|
||||
np.cumproduct(upsample_factors)[-1] == self.hop_length
|
||||
), " [!] upsample scales needs to be equal to hop_length"
|
||||
self.upsample = UpsampleNetwork(
|
||||
feat_dims,
|
||||
upsample_factors,
|
||||
compute_dims,
|
||||
res_blocks,
|
||||
res_out_dims,
|
||||
pad,
|
||||
use_aux_net,
|
||||
)
|
||||
else:
|
||||
self.upsample = Upsample(
|
||||
hop_length,
|
||||
pad,
|
||||
res_blocks,
|
||||
feat_dims,
|
||||
compute_dims,
|
||||
res_out_dims,
|
||||
use_aux_net,
|
||||
)
|
||||
if self.use_aux_net:
|
||||
self.I = nn.Linear(feat_dims + self.aux_dims + 1, rnn_dims)
|
||||
self.rnn1 = nn.GRU(rnn_dims, rnn_dims, batch_first=True)
|
||||
self.rnn2 = nn.GRU(rnn_dims + self.aux_dims, rnn_dims, batch_first=True)
|
||||
self.fc1 = nn.Linear(rnn_dims + self.aux_dims, fc_dims)
|
||||
self.fc2 = nn.Linear(fc_dims + self.aux_dims, fc_dims)
|
||||
self.fc3 = nn.Linear(fc_dims, self.n_classes)
|
||||
else:
|
||||
self.I = nn.Linear(feat_dims + 1, rnn_dims)
|
||||
self.rnn1 = nn.GRU(rnn_dims, rnn_dims, batch_first=True)
|
||||
self.rnn2 = nn.GRU(rnn_dims, rnn_dims, batch_first=True)
|
||||
self.fc1 = nn.Linear(rnn_dims, fc_dims)
|
||||
self.fc2 = nn.Linear(fc_dims, fc_dims)
|
||||
self.fc3 = nn.Linear(fc_dims, self.n_classes)
|
||||
|
||||
def forward(self, x, mels):
|
||||
bsize = x.size(0)
|
||||
h1 = torch.zeros(1, bsize, self.rnn_dims).cuda()
|
||||
h2 = torch.zeros(1, bsize, self.rnn_dims).cuda()
|
||||
mels, aux = self.upsample(mels)
|
||||
|
||||
if self.use_aux_net:
|
||||
aux_idx = [self.aux_dims * i for i in range(5)]
|
||||
a1 = aux[:, :, aux_idx[0] : aux_idx[1]]
|
||||
a2 = aux[:, :, aux_idx[1] : aux_idx[2]]
|
||||
a3 = aux[:, :, aux_idx[2] : aux_idx[3]]
|
||||
a4 = aux[:, :, aux_idx[3] : aux_idx[4]]
|
||||
|
||||
x = (
|
||||
torch.cat([x.unsqueeze(-1), mels, a1], dim=2)
|
||||
if self.use_aux_net
|
||||
else torch.cat([x.unsqueeze(-1), mels], dim=2)
|
||||
)
|
||||
x = self.I(x)
|
||||
res = x
|
||||
self.rnn1.flatten_parameters()
|
||||
x, _ = self.rnn1(x, h1)
|
||||
|
||||
x = x + res
|
||||
res = x
|
||||
x = torch.cat([x, a2], dim=2) if self.use_aux_net else x
|
||||
self.rnn2.flatten_parameters()
|
||||
x, _ = self.rnn2(x, h2)
|
||||
|
||||
x = x + res
|
||||
x = torch.cat([x, a3], dim=2) if self.use_aux_net else x
|
||||
x = F.relu(self.fc1(x))
|
||||
|
||||
x = torch.cat([x, a4], dim=2) if self.use_aux_net else x
|
||||
x = F.relu(self.fc2(x))
|
||||
return self.fc3(x)
|
||||
|
||||
def generate(self, mels, batched, target, overlap):
|
||||
|
||||
self.eval()
|
||||
output = []
|
||||
start = time.time()
|
||||
rnn1 = self.get_gru_cell(self.rnn1)
|
||||
rnn2 = self.get_gru_cell(self.rnn2)
|
||||
|
||||
with torch.no_grad():
|
||||
|
||||
mels = torch.FloatTensor(mels).cuda().unsqueeze(0)
|
||||
wave_len = (mels.size(-1) - 1) * self.hop_length
|
||||
mels = self.pad_tensor(mels.transpose(1, 2), pad=self.pad, side="both")
|
||||
mels, aux = self.upsample(mels.transpose(1, 2))
|
||||
|
||||
if batched:
|
||||
mels = self.fold_with_overlap(mels, target, overlap)
|
||||
if aux is not None:
|
||||
aux = self.fold_with_overlap(aux, target, overlap)
|
||||
|
||||
b_size, seq_len, _ = mels.size()
|
||||
|
||||
h1 = torch.zeros(b_size, self.rnn_dims).cuda()
|
||||
h2 = torch.zeros(b_size, self.rnn_dims).cuda()
|
||||
x = torch.zeros(b_size, 1).cuda()
|
||||
|
||||
if self.use_aux_net:
|
||||
d = self.aux_dims
|
||||
aux_split = [aux[:, :, d * i : d * (i + 1)] for i in range(4)]
|
||||
|
||||
for i in range(seq_len):
|
||||
|
||||
m_t = mels[:, i, :]
|
||||
|
||||
if self.use_aux_net:
|
||||
a1_t, a2_t, a3_t, a4_t = (a[:, i, :] for a in aux_split)
|
||||
|
||||
x = (
|
||||
torch.cat([x, m_t, a1_t], dim=1)
|
||||
if self.use_aux_net
|
||||
else torch.cat([x, m_t], dim=1)
|
||||
)
|
||||
x = self.I(x)
|
||||
h1 = rnn1(x, h1)
|
||||
|
||||
x = x + h1
|
||||
inp = torch.cat([x, a2_t], dim=1) if self.use_aux_net else x
|
||||
h2 = rnn2(inp, h2)
|
||||
|
||||
x = x + h2
|
||||
x = torch.cat([x, a3_t], dim=1) if self.use_aux_net else x
|
||||
x = F.relu(self.fc1(x))
|
||||
|
||||
x = torch.cat([x, a4_t], dim=1) if self.use_aux_net else x
|
||||
x = F.relu(self.fc2(x))
|
||||
|
||||
logits = self.fc3(x)
|
||||
|
||||
if self.mode == "mold":
|
||||
sample = sample_from_discretized_mix_logistic(
|
||||
logits.unsqueeze(0).transpose(1, 2)
|
||||
)
|
||||
output.append(sample.view(-1))
|
||||
x = sample.transpose(0, 1).cuda()
|
||||
elif self.mode == "gauss":
|
||||
sample = sample_from_gaussian(logits.unsqueeze(0).transpose(1, 2))
|
||||
output.append(sample.view(-1))
|
||||
x = sample.transpose(0, 1).cuda()
|
||||
elif isinstance(self.mode, int):
|
||||
posterior = F.softmax(logits, dim=1)
|
||||
distrib = torch.distributions.Categorical(posterior)
|
||||
|
||||
sample = 2 * distrib.sample().float() / (self.n_classes - 1.0) - 1.0
|
||||
output.append(sample)
|
||||
x = sample.unsqueeze(-1)
|
||||
else:
|
||||
raise RuntimeError("Unknown model mode value - ", self.mode)
|
||||
|
||||
if i % 100 == 0:
|
||||
self.gen_display(i, seq_len, b_size, start)
|
||||
|
||||
output = torch.stack(output).transpose(0, 1)
|
||||
output = output.cpu().numpy()
|
||||
output = output.astype(np.float64)
|
||||
|
||||
if batched:
|
||||
output = self.xfade_and_unfold(output, target, overlap)
|
||||
else:
|
||||
output = output[0]
|
||||
|
||||
if self.mulaw and isinstance(self.mode, int):
|
||||
output = ap.mulaw_decode(output, self.mode)
|
||||
|
||||
# Fade-out at the end to avoid signal cutting out suddenly
|
||||
fade_out = np.linspace(1, 0, 20 * self.hop_length)
|
||||
output = output[:wave_len]
|
||||
output[-20 * self.hop_length :] *= fade_out
|
||||
|
||||
self.train()
|
||||
return output
|
||||
|
||||
def gen_display(self, i, seq_len, b_size, start):
|
||||
gen_rate = (i + 1) / (time.time() - start) * b_size / 1000
|
||||
realtime_ratio = gen_rate * 1000 / self.sample_rate
|
||||
stream(
|
||||
"%i/%i -- batch_size: %i -- gen_rate: %.1f kHz -- x_realtime: %.1f ",
|
||||
(i * b_size, seq_len * b_size, b_size, gen_rate, realtime_ratio),
|
||||
)
|
||||
|
||||
def get_gru_cell(self, gru):
|
||||
gru_cell = nn.GRUCell(gru.input_size, gru.hidden_size)
|
||||
gru_cell.weight_hh.data = gru.weight_hh_l0.data
|
||||
gru_cell.weight_ih.data = gru.weight_ih_l0.data
|
||||
gru_cell.bias_hh.data = gru.bias_hh_l0.data
|
||||
gru_cell.bias_ih.data = gru.bias_ih_l0.data
|
||||
return gru_cell
|
||||
|
||||
def pad_tensor(self, x, pad, side="both"):
|
||||
# NB - this is just a quick method i need right now
|
||||
# i.e., it won't generalise to other shapes/dims
|
||||
b, t, c = x.size()
|
||||
total = t + 2 * pad if side == "both" else t + pad
|
||||
padded = torch.zeros(b, total, c).cuda()
|
||||
if side == "before" or side == "both":
|
||||
padded[:, pad : pad + t, :] = x
|
||||
elif side == "after":
|
||||
padded[:, :t, :] = x
|
||||
return padded
|
||||
|
||||
def fold_with_overlap(self, x, target, overlap):
|
||||
|
||||
"""Fold the tensor with overlap for quick batched inference.
|
||||
Overlap will be used for crossfading in xfade_and_unfold()
|
||||
Args:
|
||||
x (tensor) : Upsampled conditioning features.
|
||||
shape=(1, timesteps, features)
|
||||
target (int) : Target timesteps for each index of batch
|
||||
overlap (int) : Timesteps for both xfade and rnn warmup
|
||||
Return:
|
||||
(tensor) : shape=(num_folds, target + 2 * overlap, features)
|
||||
Details:
|
||||
x = [[h1, h2, ... hn]]
|
||||
Where each h is a vector of conditioning features
|
||||
Eg: target=2, overlap=1 with x.size(1)=10
|
||||
folded = [[h1, h2, h3, h4],
|
||||
[h4, h5, h6, h7],
|
||||
[h7, h8, h9, h10]]
|
||||
"""
|
||||
|
||||
_, total_len, features = x.size()
|
||||
|
||||
# Calculate variables needed
|
||||
num_folds = (total_len - overlap) // (target + overlap)
|
||||
extended_len = num_folds * (overlap + target) + overlap
|
||||
remaining = total_len - extended_len
|
||||
|
||||
# Pad if some time steps poking out
|
||||
if remaining != 0:
|
||||
num_folds += 1
|
||||
padding = target + 2 * overlap - remaining
|
||||
x = self.pad_tensor(x, padding, side="after")
|
||||
|
||||
folded = torch.zeros(num_folds, target + 2 * overlap, features).cuda()
|
||||
|
||||
# Get the values for the folded tensor
|
||||
for i in range(num_folds):
|
||||
start = i * (target + overlap)
|
||||
end = start + target + 2 * overlap
|
||||
folded[i] = x[:, start:end, :]
|
||||
|
||||
return folded
|
||||
|
||||
def xfade_and_unfold(self, y, target, overlap):
|
||||
|
||||
"""Applies a crossfade and unfolds into a 1d array.
|
||||
Args:
|
||||
y (ndarry) : Batched sequences of audio samples
|
||||
shape=(num_folds, target + 2 * overlap)
|
||||
dtype=np.float64
|
||||
overlap (int) : Timesteps for both xfade and rnn warmup
|
||||
Return:
|
||||
(ndarry) : audio samples in a 1d array
|
||||
shape=(total_len)
|
||||
dtype=np.float64
|
||||
Details:
|
||||
y = [[seq1],
|
||||
[seq2],
|
||||
[seq3]]
|
||||
Apply a gain envelope at both ends of the sequences
|
||||
y = [[seq1_in, seq1_target, seq1_out],
|
||||
[seq2_in, seq2_target, seq2_out],
|
||||
[seq3_in, seq3_target, seq3_out]]
|
||||
Stagger and add up the groups of samples:
|
||||
[seq1_in, seq1_target, (seq1_out + seq2_in), seq2_target, ...]
|
||||
"""
|
||||
|
||||
num_folds, length = y.shape
|
||||
target = length - 2 * overlap
|
||||
total_len = num_folds * (target + overlap) + overlap
|
||||
|
||||
# Need some silence for the rnn warmup
|
||||
silence_len = overlap // 2
|
||||
fade_len = overlap - silence_len
|
||||
silence = np.zeros((silence_len), dtype=np.float64)
|
||||
|
||||
# Equal power crossfade
|
||||
t = np.linspace(-1, 1, fade_len, dtype=np.float64)
|
||||
fade_in = np.sqrt(0.5 * (1 + t))
|
||||
fade_out = np.sqrt(0.5 * (1 - t))
|
||||
|
||||
# Concat the silence to the fades
|
||||
fade_in = np.concatenate([silence, fade_in])
|
||||
fade_out = np.concatenate([fade_out, silence])
|
||||
|
||||
# Apply the gain to the overlap samples
|
||||
y[:, :overlap] *= fade_in
|
||||
y[:, -overlap:] *= fade_out
|
||||
|
||||
unfolded = np.zeros((total_len), dtype=np.float64)
|
||||
|
||||
# Loop to add up all the samples
|
||||
for i in range(num_folds):
|
||||
start = i * (target + overlap)
|
||||
end = start + target + 2 * overlap
|
||||
unfolded[start:end] += y[i]
|
||||
|
||||
return unfolded
|
|
@ -0,0 +1,155 @@
|
|||
import numpy as np
|
||||
import math
|
||||
import torch
|
||||
from torch.distributions.normal import Normal
|
||||
import torch.nn.functional as F
|
||||
|
||||
|
||||
def gaussian_loss(y_hat, y, log_std_min=-7.0):
|
||||
assert y_hat.dim() == 3
|
||||
assert y_hat.size(2) == 2
|
||||
mean = y_hat[:, :, :1]
|
||||
log_std = torch.clamp(y_hat[:, :, 1:], min=log_std_min)
|
||||
# TODO: replace with pytorch dist
|
||||
log_probs = -0.5 * (- math.log(2.0 * math.pi) - 2. * log_std - torch.pow(y - mean, 2) * torch.exp((-2.0 * log_std)))
|
||||
return log_probs.squeeze().mean()
|
||||
|
||||
|
||||
def sample_from_gaussian(y_hat, log_std_min=-7.0, scale_factor=1.0):
|
||||
assert y_hat.size(2) == 2
|
||||
mean = y_hat[:, :, :1]
|
||||
log_std = torch.clamp(y_hat[:, :, 1:], min=log_std_min)
|
||||
dist = Normal(mean, torch.exp(log_std), )
|
||||
sample = dist.sample()
|
||||
sample = torch.clamp(torch.clamp(sample, min=-scale_factor), max=scale_factor)
|
||||
del dist
|
||||
return sample
|
||||
|
||||
|
||||
def log_sum_exp(x):
|
||||
""" numerically stable log_sum_exp implementation that prevents overflow """
|
||||
# TF ordering
|
||||
axis = len(x.size()) - 1
|
||||
m, _ = torch.max(x, dim=axis)
|
||||
m2, _ = torch.max(x, dim=axis, keepdim=True)
|
||||
return m + torch.log(torch.sum(torch.exp(x - m2), dim=axis))
|
||||
|
||||
|
||||
# It is adapted from https://github.com/r9y9/wavenet_vocoder/blob/master/wavenet_vocoder/mixture.py
|
||||
def discretized_mix_logistic_loss(y_hat, y, num_classes=65536,
|
||||
log_scale_min=None, reduce=True):
|
||||
if log_scale_min is None:
|
||||
log_scale_min = float(np.log(1e-14))
|
||||
y_hat = y_hat.permute(0,2,1)
|
||||
assert y_hat.dim() == 3
|
||||
assert y_hat.size(1) % 3 == 0
|
||||
nr_mix = y_hat.size(1) // 3
|
||||
|
||||
# (B x T x C)
|
||||
y_hat = y_hat.transpose(1, 2)
|
||||
|
||||
# unpack parameters. (B, T, num_mixtures) x 3
|
||||
logit_probs = y_hat[:, :, :nr_mix]
|
||||
means = y_hat[:, :, nr_mix:2 * nr_mix]
|
||||
log_scales = torch.clamp(y_hat[:, :, 2 * nr_mix:3 * nr_mix], min=log_scale_min)
|
||||
|
||||
# B x T x 1 -> B x T x num_mixtures
|
||||
y = y.expand_as(means)
|
||||
|
||||
centered_y = y - means
|
||||
inv_stdv = torch.exp(-log_scales)
|
||||
plus_in = inv_stdv * (centered_y + 1. / (num_classes - 1))
|
||||
cdf_plus = torch.sigmoid(plus_in)
|
||||
min_in = inv_stdv * (centered_y - 1. / (num_classes - 1))
|
||||
cdf_min = torch.sigmoid(min_in)
|
||||
|
||||
# log probability for edge case of 0 (before scaling)
|
||||
# equivalent: torch.log(F.sigmoid(plus_in))
|
||||
log_cdf_plus = plus_in - F.softplus(plus_in)
|
||||
|
||||
# log probability for edge case of 255 (before scaling)
|
||||
# equivalent: (1 - F.sigmoid(min_in)).log()
|
||||
log_one_minus_cdf_min = -F.softplus(min_in)
|
||||
|
||||
# probability for all other cases
|
||||
cdf_delta = cdf_plus - cdf_min
|
||||
|
||||
mid_in = inv_stdv * centered_y
|
||||
# log probability in the center of the bin, to be used in extreme cases
|
||||
# (not actually used in our code)
|
||||
log_pdf_mid = mid_in - log_scales - 2. * F.softplus(mid_in)
|
||||
|
||||
# tf equivalent
|
||||
"""
|
||||
log_probs = tf.where(x < -0.999, log_cdf_plus,
|
||||
tf.where(x > 0.999, log_one_minus_cdf_min,
|
||||
tf.where(cdf_delta > 1e-5,
|
||||
tf.log(tf.maximum(cdf_delta, 1e-12)),
|
||||
log_pdf_mid - np.log(127.5))))
|
||||
"""
|
||||
# TODO: cdf_delta <= 1e-5 actually can happen. How can we choose the value
|
||||
# for num_classes=65536 case? 1e-7? not sure..
|
||||
inner_inner_cond = (cdf_delta > 1e-5).float()
|
||||
|
||||
inner_inner_out = inner_inner_cond * \
|
||||
torch.log(torch.clamp(cdf_delta, min=1e-12)) + \
|
||||
(1. - inner_inner_cond) * (log_pdf_mid - np.log((num_classes - 1) / 2))
|
||||
inner_cond = (y > 0.999).float()
|
||||
inner_out = inner_cond * log_one_minus_cdf_min + (1. - inner_cond) * inner_inner_out
|
||||
cond = (y < -0.999).float()
|
||||
log_probs = cond * log_cdf_plus + (1. - cond) * inner_out
|
||||
|
||||
log_probs = log_probs + F.log_softmax(logit_probs, -1)
|
||||
|
||||
if reduce:
|
||||
return -torch.mean(log_sum_exp(log_probs))
|
||||
else:
|
||||
return -log_sum_exp(log_probs).unsqueeze(-1)
|
||||
|
||||
|
||||
def sample_from_discretized_mix_logistic(y, log_scale_min=None):
|
||||
"""
|
||||
Sample from discretized mixture of logistic distributions
|
||||
Args:
|
||||
y (Tensor): B x C x T
|
||||
log_scale_min (float): Log scale minimum value
|
||||
Returns:
|
||||
Tensor: sample in range of [-1, 1].
|
||||
"""
|
||||
if log_scale_min is None:
|
||||
log_scale_min = float(np.log(1e-14))
|
||||
assert y.size(1) % 3 == 0
|
||||
nr_mix = y.size(1) // 3
|
||||
|
||||
# B x T x C
|
||||
y = y.transpose(1, 2)
|
||||
logit_probs = y[:, :, :nr_mix]
|
||||
|
||||
# sample mixture indicator from softmax
|
||||
temp = logit_probs.data.new(logit_probs.size()).uniform_(1e-5, 1.0 - 1e-5)
|
||||
temp = logit_probs.data - torch.log(- torch.log(temp))
|
||||
_, argmax = temp.max(dim=-1)
|
||||
|
||||
# (B, T) -> (B, T, nr_mix)
|
||||
one_hot = to_one_hot(argmax, nr_mix)
|
||||
# select logistic parameters
|
||||
means = torch.sum(y[:, :, nr_mix:2 * nr_mix] * one_hot, dim=-1)
|
||||
log_scales = torch.clamp(torch.sum(
|
||||
y[:, :, 2 * nr_mix:3 * nr_mix] * one_hot, dim=-1), min=log_scale_min)
|
||||
# sample from logistic & clip to interval
|
||||
# we don't actually round to the nearest 8bit value when sampling
|
||||
u = means.data.new(means.size()).uniform_(1e-5, 1.0 - 1e-5)
|
||||
x = means + torch.exp(log_scales) * (torch.log(u) - torch.log(1. - u))
|
||||
|
||||
x = torch.clamp(torch.clamp(x, min=-1.), max=1.)
|
||||
|
||||
return x
|
||||
|
||||
|
||||
def to_one_hot(tensor, n, fill_with=1.):
|
||||
# we perform one hot encore with respect to the last axis
|
||||
one_hot = torch.FloatTensor(tensor.size() + (n,)).zero_()
|
||||
if tensor.is_cuda:
|
||||
one_hot = one_hot.cuda()
|
||||
one_hot.scatter_(len(tensor.size()), tensor.unsqueeze(-1), fill_with)
|
||||
return one_hot
|
Loading…
Reference in New Issue