mirror of
https://github.com/fumiama/Retrieval-based-Voice-Conversion-WebUI.git
synced 2026-06-05 01:10:22 +08:00
@@ -43,9 +43,14 @@ def float_np_array_to_wav_buf(wav: np.ndarray, sr: int, f32=False) -> BytesIO:
|
||||
return buf
|
||||
|
||||
|
||||
def save_audio(path: str, audio: np.ndarray, sr: int, f32=False):
|
||||
def save_audio(path: str, audio: np.ndarray, sr: int, f32=False, format="wav"):
|
||||
buf = float_np_array_to_wav_buf(audio, sr, f32)
|
||||
if format != "wav":
|
||||
transbuf = BytesIO()
|
||||
wav2(buf, transbuf, format)
|
||||
buf = transbuf
|
||||
with open(path, "wb") as f:
|
||||
f.write(float_np_array_to_wav_buf(audio, sr, f32).getbuffer())
|
||||
f.write(buf.getbuffer())
|
||||
|
||||
|
||||
def wav2(i: BytesIO, o: BufferedWriter, format: str):
|
||||
@@ -109,7 +114,7 @@ def load_audio(
|
||||
frames_data = []
|
||||
rate = 0
|
||||
for frame in packet:
|
||||
frame.pts = None # 清除时间戳,避免重新采样问题
|
||||
# frame.pts = None # 清除时间戳,避免重新采样问题
|
||||
resampled_frames = (
|
||||
resampler.resample(frame) if resampler is not None else [frame]
|
||||
)
|
||||
@@ -137,6 +142,8 @@ def load_audio(
|
||||
|
||||
np.copyto(decoded_audio[..., offset:end_index], frame_data)
|
||||
offset += len(frame_data[0])
|
||||
|
||||
container.close()
|
||||
|
||||
# Truncate the array to the actual size
|
||||
decoded_audio = decoded_audio[..., :offset]
|
||||
@@ -149,43 +156,6 @@ def load_audio(
|
||||
return decoded_audio, rate
|
||||
|
||||
|
||||
def downsample_audio(
|
||||
input_path: str, output_path: str, format: str, br=128_000
|
||||
) -> None:
|
||||
"""
|
||||
default to 128kb/s (equivalent to -q:a 2)
|
||||
"""
|
||||
if not os.path.exists(input_path):
|
||||
return
|
||||
|
||||
input_container = av.open(input_path)
|
||||
output_container = av.open(output_path, "w")
|
||||
|
||||
# Create a stream in the output container
|
||||
input_stream = input_container.streams.audio[0]
|
||||
output_stream = output_container.add_stream(format)
|
||||
|
||||
output_stream.bit_rate = br
|
||||
|
||||
# Copy packets from the input file to the output file
|
||||
for packet in input_container.demux(input_stream):
|
||||
for frame in packet.decode():
|
||||
for out_packet in output_stream.encode(frame):
|
||||
output_container.mux(out_packet)
|
||||
|
||||
for packet in output_stream.encode():
|
||||
output_container.mux(packet)
|
||||
|
||||
# Close the containers
|
||||
input_container.close()
|
||||
output_container.close()
|
||||
|
||||
try: # Remove the original file
|
||||
os.remove(input_path)
|
||||
except Exception as e:
|
||||
print(f"Failed to remove the original file: {e}")
|
||||
|
||||
|
||||
def resample_audio(
|
||||
input_path: str, output_path: str, codec: str, format: str, sr: int, layout: str
|
||||
) -> None:
|
||||
@@ -204,7 +174,7 @@ def resample_audio(
|
||||
# Copy packets from the input file to the output file
|
||||
for packet in input_container.demux(input_stream):
|
||||
for frame in packet.decode():
|
||||
frame.pts = None # Clear presentation timestamp to avoid resampling issues
|
||||
# frame.pts = None # Clear presentation timestamp to avoid resampling issues
|
||||
out_frames = resampler.resample(frame)
|
||||
for out_frame in out_frames:
|
||||
for out_packet in output_stream.encode(out_frame):
|
||||
@@ -217,10 +187,6 @@ def resample_audio(
|
||||
input_container.close()
|
||||
output_container.close()
|
||||
|
||||
try: # Remove the original file
|
||||
os.remove(input_path)
|
||||
except Exception as e:
|
||||
print(f"Failed to remove the original file: {e}")
|
||||
|
||||
|
||||
def get_audio_properties(input_path: str) -> Tuple[int, int]:
|
||||
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
from copy import deepcopy
|
||||
import math
|
||||
|
||||
import codecs
|
||||
import numpy as np
|
||||
@@ -103,7 +104,7 @@ def summarize(
|
||||
|
||||
def latest_checkpoint_path(dir_path, regex="G_*.pth"):
|
||||
f_list = glob.glob(os.path.join(dir_path, regex))
|
||||
f_list.sort(key=lambda f: int("".join(filter(str.isdigit, f))))
|
||||
f_list.sort(key=lambda f: 999999999999 if isinstance(f, str) and f == "latest" else int("0"+"".join(filter(str.isdigit, f))))
|
||||
x = f_list[-1]
|
||||
logger.debug(x)
|
||||
return x
|
||||
|
||||
@@ -1,183 +0,0 @@
|
||||
import os
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.utils.data
|
||||
from tqdm import tqdm
|
||||
|
||||
from . import spec_utils
|
||||
|
||||
|
||||
class VocalRemoverValidationSet(torch.utils.data.Dataset):
|
||||
def __init__(self, patch_list):
|
||||
self.patch_list = patch_list
|
||||
|
||||
def __len__(self):
|
||||
return len(self.patch_list)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
path = self.patch_list[idx]
|
||||
data = np.load(path)
|
||||
|
||||
X, y = data["X"], data["y"]
|
||||
|
||||
X_mag = np.abs(X)
|
||||
y_mag = np.abs(y)
|
||||
|
||||
return X_mag, y_mag
|
||||
|
||||
|
||||
def make_pair(mix_dir, inst_dir):
|
||||
input_exts = [".wav", ".m4a", ".mp3", ".mp4", ".flac"]
|
||||
|
||||
X_list = sorted(
|
||||
[
|
||||
os.path.join(mix_dir, fname)
|
||||
for fname in os.listdir(mix_dir)
|
||||
if os.path.splitext(fname)[1] in input_exts
|
||||
]
|
||||
)
|
||||
y_list = sorted(
|
||||
[
|
||||
os.path.join(inst_dir, fname)
|
||||
for fname in os.listdir(inst_dir)
|
||||
if os.path.splitext(fname)[1] in input_exts
|
||||
]
|
||||
)
|
||||
|
||||
filelist = list(zip(X_list, y_list))
|
||||
|
||||
return filelist
|
||||
|
||||
|
||||
def train_val_split(dataset_dir, split_mode, val_rate, val_filelist):
|
||||
if split_mode == "random":
|
||||
filelist = make_pair(
|
||||
os.path.join(dataset_dir, "mixtures"),
|
||||
os.path.join(dataset_dir, "instruments"),
|
||||
)
|
||||
|
||||
random.shuffle(filelist)
|
||||
|
||||
if len(val_filelist) == 0:
|
||||
val_size = int(len(filelist) * val_rate)
|
||||
train_filelist = filelist[:-val_size]
|
||||
val_filelist = filelist[-val_size:]
|
||||
else:
|
||||
train_filelist = [
|
||||
pair for pair in filelist if list(pair) not in val_filelist
|
||||
]
|
||||
elif split_mode == "subdirs":
|
||||
if len(val_filelist) != 0:
|
||||
raise ValueError(
|
||||
"The `val_filelist` option is not available in `subdirs` mode"
|
||||
)
|
||||
|
||||
train_filelist = make_pair(
|
||||
os.path.join(dataset_dir, "training/mixtures"),
|
||||
os.path.join(dataset_dir, "training/instruments"),
|
||||
)
|
||||
|
||||
val_filelist = make_pair(
|
||||
os.path.join(dataset_dir, "validation/mixtures"),
|
||||
os.path.join(dataset_dir, "validation/instruments"),
|
||||
)
|
||||
|
||||
return train_filelist, val_filelist
|
||||
|
||||
|
||||
def augment(X, y, reduction_rate, reduction_mask, mixup_rate, mixup_alpha):
|
||||
perm = np.random.permutation(len(X))
|
||||
for i, idx in enumerate(tqdm(perm)):
|
||||
if np.random.uniform() < reduction_rate:
|
||||
y[idx] = spec_utils.reduce_vocal_aggressively(
|
||||
X[idx], y[idx], reduction_mask
|
||||
)
|
||||
|
||||
if np.random.uniform() < 0.5:
|
||||
# swap channel
|
||||
X[idx] = X[idx, ::-1]
|
||||
y[idx] = y[idx, ::-1]
|
||||
if np.random.uniform() < 0.02:
|
||||
# mono
|
||||
X[idx] = X[idx].mean(axis=0, keepdims=True)
|
||||
y[idx] = y[idx].mean(axis=0, keepdims=True)
|
||||
if np.random.uniform() < 0.02:
|
||||
# inst
|
||||
X[idx] = y[idx]
|
||||
|
||||
if np.random.uniform() < mixup_rate and i < len(perm) - 1:
|
||||
lam = np.random.beta(mixup_alpha, mixup_alpha)
|
||||
X[idx] = lam * X[idx] + (1 - lam) * X[perm[i + 1]]
|
||||
y[idx] = lam * y[idx] + (1 - lam) * y[perm[i + 1]]
|
||||
|
||||
return X, y
|
||||
|
||||
|
||||
def make_padding(width, cropsize, offset):
|
||||
left = offset
|
||||
roi_size = cropsize - left * 2
|
||||
if roi_size == 0:
|
||||
roi_size = cropsize
|
||||
right = roi_size - (width % roi_size) + left
|
||||
|
||||
return left, right, roi_size
|
||||
|
||||
|
||||
def make_training_set(filelist, cropsize, patches, sr, hop_length, n_fft, offset):
|
||||
len_dataset = patches * len(filelist)
|
||||
|
||||
X_dataset = np.zeros((len_dataset, 2, n_fft // 2 + 1, cropsize), dtype=np.complex64)
|
||||
y_dataset = np.zeros((len_dataset, 2, n_fft // 2 + 1, cropsize), dtype=np.complex64)
|
||||
|
||||
for i, (X_path, y_path) in enumerate(tqdm(filelist)):
|
||||
X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length, n_fft)
|
||||
coef = np.max([np.abs(X).max(), np.abs(y).max()])
|
||||
X, y = X / coef, y / coef
|
||||
|
||||
l, r, roi_size = make_padding(X.shape[2], cropsize, offset)
|
||||
X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode="constant")
|
||||
y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode="constant")
|
||||
|
||||
starts = np.random.randint(0, X_pad.shape[2] - cropsize, patches)
|
||||
ends = starts + cropsize
|
||||
for j in range(patches):
|
||||
idx = i * patches + j
|
||||
X_dataset[idx] = X_pad[:, :, starts[j] : ends[j]]
|
||||
y_dataset[idx] = y_pad[:, :, starts[j] : ends[j]]
|
||||
|
||||
return X_dataset, y_dataset
|
||||
|
||||
|
||||
def make_validation_set(filelist, cropsize, sr, hop_length, n_fft, offset):
|
||||
patch_list = []
|
||||
patch_dir = "cs{}_sr{}_hl{}_nf{}_of{}".format(
|
||||
cropsize, sr, hop_length, n_fft, offset
|
||||
)
|
||||
os.makedirs(patch_dir, exist_ok=True)
|
||||
|
||||
for i, (X_path, y_path) in enumerate(tqdm(filelist)):
|
||||
basename = os.path.splitext(os.path.basename(X_path))[0]
|
||||
|
||||
X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length, n_fft)
|
||||
coef = np.max([np.abs(X).max(), np.abs(y).max()])
|
||||
X, y = X / coef, y / coef
|
||||
|
||||
l, r, roi_size = make_padding(X.shape[2], cropsize, offset)
|
||||
X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode="constant")
|
||||
y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode="constant")
|
||||
|
||||
len_dataset = int(np.ceil(X.shape[2] / roi_size))
|
||||
for j in range(len_dataset):
|
||||
outpath = os.path.join(patch_dir, "{}_p{}.npz".format(basename, j))
|
||||
start = j * roi_size
|
||||
if not os.path.exists(outpath):
|
||||
np.savez(
|
||||
outpath,
|
||||
X=X_pad[:, :, start : start + cropsize],
|
||||
y=y_pad[:, :, start : start + cropsize],
|
||||
)
|
||||
patch_list.append(outpath)
|
||||
|
||||
return VocalRemoverValidationSet(patch_list)
|
||||
@@ -22,7 +22,8 @@ class Conv2DBNActiv(nn.Module):
|
||||
activ(),
|
||||
)
|
||||
|
||||
def __call__(self, x):
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
return self.conv(x)
|
||||
|
||||
|
||||
@@ -32,7 +33,8 @@ class Encoder(nn.Module):
|
||||
self.conv1 = Conv2DBNActiv(nin, nout, ksize, stride, pad, activ=activ)
|
||||
self.conv2 = Conv2DBNActiv(nout, nout, ksize, 1, pad, activ=activ)
|
||||
|
||||
def __call__(self, x):
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
h = self.conv1(x)
|
||||
h = self.conv2(h)
|
||||
|
||||
@@ -48,7 +50,8 @@ class Decoder(nn.Module):
|
||||
# self.conv2 = Conv2DBNActiv(nout, nout, ksize, 1, pad, activ=activ)
|
||||
self.dropout = nn.Dropout2d(0.1) if dropout else None
|
||||
|
||||
def __call__(self, x, skip=None):
|
||||
@torch.inference_mode()
|
||||
def forward(self, x, skip=None):
|
||||
x = F.interpolate(x, scale_factor=2, mode="bilinear", align_corners=True)
|
||||
|
||||
if skip is not None:
|
||||
@@ -84,6 +87,7 @@ class ASPPModule(nn.Module):
|
||||
self.bottleneck = Conv2DBNActiv(nout * 5, nout, 1, 1, 0, activ=activ)
|
||||
self.dropout = nn.Dropout2d(0.1) if dropout else None
|
||||
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
_, _, h, w = x.size()
|
||||
feat1 = F.interpolate(
|
||||
@@ -113,6 +117,7 @@ class LSTMModule(nn.Module):
|
||||
nn.Linear(nout_lstm, nin_lstm), nn.BatchNorm1d(nin_lstm), nn.ReLU()
|
||||
)
|
||||
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
N, _, nbins, nframes = x.size()
|
||||
h = self.conv(x)[:, 0] # N, nbins, nframes
|
||||
|
||||
@@ -24,7 +24,8 @@ class BaseNet(nn.Module):
|
||||
self.lstm_dec2 = layers.LSTMModule(nout * 2, nin_lstm, nout_lstm)
|
||||
self.dec1 = layers.Decoder(nout * (1 + 2) + 1, nout * 1, 3, 1, 1)
|
||||
|
||||
def __call__(self, x):
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
e1 = self.enc1(x)
|
||||
e2 = self.enc2(e1)
|
||||
e3 = self.enc3(e2)
|
||||
@@ -75,6 +76,7 @@ class CascadedNet(nn.Module):
|
||||
self.out = nn.Conv2d(nout, 2, 1, bias=False)
|
||||
self.aux_out = nn.Conv2d(3 * nout // 4, 2, 1, bias=False)
|
||||
|
||||
@torch.inference_mode()
|
||||
def forward(self, x):
|
||||
x = x[:, :, : self.max_bin]
|
||||
|
||||
@@ -112,22 +114,3 @@ class CascadedNet(nn.Module):
|
||||
return mask, aux
|
||||
else:
|
||||
return mask
|
||||
|
||||
def predict_mask(self, x):
|
||||
mask = self.forward(x)
|
||||
|
||||
if self.offset > 0:
|
||||
mask = mask[:, :, :, self.offset : -self.offset]
|
||||
assert mask.size()[3] > 0
|
||||
|
||||
return mask
|
||||
|
||||
def predict(self, x, aggressiveness=None):
|
||||
mask = self.forward(x)
|
||||
pred_mag = x * mask
|
||||
|
||||
if self.offset > 0:
|
||||
pred_mag = pred_mag[:, :, :, self.offset : -self.offset]
|
||||
assert pred_mag.size()[3] > 0
|
||||
|
||||
return pred_mag
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import hashlib
|
||||
import json
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import math
|
||||
import os
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
from numba import jit
|
||||
|
||||
|
||||
def crop_center(h1, h2):
|
||||
@@ -25,61 +24,42 @@ def crop_center(h1, h2):
|
||||
return h1
|
||||
|
||||
|
||||
def wave_to_spectrogram(
|
||||
wave, hop_length, n_fft, mid_side=False, mid_side_b2=False, reverse=False
|
||||
def split_lr_waves(
|
||||
wave, mid_side=False, mid_side_b2=False, reverse=False
|
||||
):
|
||||
if reverse:
|
||||
wave_left = np.flip(np.asfortranarray(wave[0]))
|
||||
wave_right = np.flip(np.asfortranarray(wave[1]))
|
||||
elif mid_side:
|
||||
wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
|
||||
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
|
||||
wave_left = np.add(wave[0], wave[1]) / 2
|
||||
wave_right = np.subtract(wave[0], wave[1])
|
||||
elif mid_side_b2:
|
||||
wave_left = np.asfortranarray(np.add(wave[1], wave[0] * 0.5))
|
||||
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1] * 0.5))
|
||||
wave_left = np.add(wave[1], wave[0] * 0.5)
|
||||
wave_right = np.subtract(wave[0], wave[1] * 0.5)
|
||||
else:
|
||||
wave_left = np.asfortranarray(wave[0])
|
||||
wave_right = np.asfortranarray(wave[1])
|
||||
wave_left = wave[0]
|
||||
wave_right = wave[1]
|
||||
|
||||
spec_left = librosa.stft(wave_left, n_fft=n_fft, hop_length=hop_length)
|
||||
spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length)
|
||||
return wave_left, wave_right
|
||||
|
||||
spec = np.asfortranarray([spec_left, spec_right])
|
||||
|
||||
return spec
|
||||
|
||||
def run_librosa_stft(wv, n_fft, hop_length, reverse):
|
||||
if reverse:
|
||||
return librosa.stft(wv, n_fft=n_fft, hop_length=hop_length)
|
||||
return librosa.stft(np.asfortranarray(wv), n_fft=n_fft, hop_length=hop_length)
|
||||
|
||||
def wave_to_spectrogram_mt(
|
||||
wave, hop_length, n_fft, mid_side=False, mid_side_b2=False, reverse=False
|
||||
):
|
||||
import threading
|
||||
|
||||
if reverse:
|
||||
wave_left = np.flip(np.asfortranarray(wave[0]))
|
||||
wave_right = np.flip(np.asfortranarray(wave[1]))
|
||||
elif mid_side:
|
||||
wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
|
||||
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
|
||||
elif mid_side_b2:
|
||||
wave_left = np.asfortranarray(np.add(wave[1], wave[0] * 0.5))
|
||||
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1] * 0.5))
|
||||
else:
|
||||
wave_left = np.asfortranarray(wave[0])
|
||||
wave_right = np.asfortranarray(wave[1])
|
||||
|
||||
def run_thread(**kwargs):
|
||||
global spec_left
|
||||
spec_left = librosa.stft(**kwargs)
|
||||
|
||||
thread = threading.Thread(
|
||||
target=run_thread,
|
||||
kwargs={"y": wave_left, "n_fft": n_fft, "hop_length": hop_length},
|
||||
)
|
||||
thread.start()
|
||||
spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length)
|
||||
thread.join()
|
||||
|
||||
spec = np.asfortranarray([spec_left, spec_right])
|
||||
with ThreadPoolExecutor(max_workers=2) as tp:
|
||||
spec = np.asfortranarray(
|
||||
[spec for spec in tp.map(
|
||||
run_librosa_stft,
|
||||
split_lr_waves(wave, mid_side, mid_side_b2, reverse),
|
||||
[n_fft, n_fft], [hop_length, hop_length], [reverse, reverse]
|
||||
)]
|
||||
)
|
||||
|
||||
return spec
|
||||
|
||||
@@ -122,41 +102,7 @@ def combine_spectrograms(specs, mp):
|
||||
return np.asfortranarray(spec_c)
|
||||
|
||||
|
||||
def spectrogram_to_image(spec, mode="magnitude"):
|
||||
if mode == "magnitude":
|
||||
if np.iscomplexobj(spec):
|
||||
y = np.abs(spec)
|
||||
else:
|
||||
y = spec
|
||||
y = np.log10(y**2 + 1e-8)
|
||||
elif mode == "phase":
|
||||
if np.iscomplexobj(spec):
|
||||
y = np.angle(spec)
|
||||
else:
|
||||
y = spec
|
||||
|
||||
y -= y.min()
|
||||
y *= 255 / y.max()
|
||||
img = np.uint8(y)
|
||||
|
||||
if y.ndim == 3:
|
||||
img = img.transpose(1, 2, 0)
|
||||
img = np.concatenate([np.max(img, axis=2, keepdims=True), img], axis=2)
|
||||
|
||||
return img
|
||||
|
||||
|
||||
def reduce_vocal_aggressively(X, y, softmask):
|
||||
v = X - y
|
||||
y_mag_tmp = np.abs(y)
|
||||
v_mag_tmp = np.abs(v)
|
||||
|
||||
v_mask = v_mag_tmp > y_mag_tmp
|
||||
y_mag = np.clip(y_mag_tmp - v_mag_tmp * v_mask * softmask, 0, np.inf)
|
||||
|
||||
return y_mag * np.exp(1.0j * np.angle(y))
|
||||
|
||||
|
||||
@jit(nopython=True)
|
||||
def mask_silence(mag, ref, thres=0.2, min_range=64, fade_size=32):
|
||||
if min_range < fade_size * 2:
|
||||
raise ValueError("min_range must be >= fade_area * 2")
|
||||
@@ -195,141 +141,13 @@ def mask_silence(mag, ref, thres=0.2, min_range=64, fade_size=32):
|
||||
return mag
|
||||
|
||||
|
||||
def align_wave_head_and_tail(a, b):
|
||||
l = min([a[0].size, b[0].size])
|
||||
|
||||
return a[:l, :l], b[:l, :l]
|
||||
|
||||
|
||||
def cache_or_load(mix_path, inst_path, mp):
|
||||
mix_basename = os.path.splitext(os.path.basename(mix_path))[0]
|
||||
inst_basename = os.path.splitext(os.path.basename(inst_path))[0]
|
||||
|
||||
cache_dir = "mph{}".format(
|
||||
hashlib.sha1(json.dumps(mp.param, sort_keys=True).encode("utf-8")).hexdigest()
|
||||
)
|
||||
mix_cache_dir = os.path.join("cache", cache_dir)
|
||||
inst_cache_dir = os.path.join("cache", cache_dir)
|
||||
|
||||
os.makedirs(mix_cache_dir, exist_ok=True)
|
||||
os.makedirs(inst_cache_dir, exist_ok=True)
|
||||
|
||||
mix_cache_path = os.path.join(mix_cache_dir, mix_basename + ".npy")
|
||||
inst_cache_path = os.path.join(inst_cache_dir, inst_basename + ".npy")
|
||||
|
||||
if os.path.exists(mix_cache_path) and os.path.exists(inst_cache_path):
|
||||
X_spec_m = np.load(mix_cache_path)
|
||||
y_spec_m = np.load(inst_cache_path)
|
||||
else:
|
||||
X_wave, y_wave, X_spec_s, y_spec_s = {}, {}, {}, {}
|
||||
|
||||
for d in range(len(mp.param["band"]), 0, -1):
|
||||
bp = mp.param["band"][d]
|
||||
|
||||
if d == len(mp.param["band"]): # high-end band
|
||||
X_wave[d], _ = librosa.load(
|
||||
mix_path,
|
||||
sr=bp["sr"],
|
||||
mono=False,
|
||||
dtype=np.float32,
|
||||
res_type=bp["res_type"],
|
||||
)
|
||||
y_wave[d], _ = librosa.load(
|
||||
inst_path,
|
||||
sr=bp["sr"],
|
||||
mono=False,
|
||||
dtype=np.float32,
|
||||
res_type=bp["res_type"],
|
||||
)
|
||||
else: # lower bands
|
||||
X_wave[d] = librosa.resample(
|
||||
X_wave[d + 1],
|
||||
orig_sr=mp.param["band"][d + 1]["sr"],
|
||||
target_sr=bp["sr"],
|
||||
res_type=bp["res_type"],
|
||||
)
|
||||
y_wave[d] = librosa.resample(
|
||||
y_wave[d + 1],
|
||||
orig_sr=mp.param["band"][d + 1]["sr"],
|
||||
target_sr=bp["sr"],
|
||||
res_type=bp["res_type"],
|
||||
)
|
||||
|
||||
X_wave[d], y_wave[d] = align_wave_head_and_tail(X_wave[d], y_wave[d])
|
||||
|
||||
X_spec_s[d] = wave_to_spectrogram(
|
||||
X_wave[d],
|
||||
bp["hl"],
|
||||
bp["n_fft"],
|
||||
mp.param["mid_side"],
|
||||
mp.param["mid_side_b2"],
|
||||
mp.param["reverse"],
|
||||
)
|
||||
y_spec_s[d] = wave_to_spectrogram(
|
||||
y_wave[d],
|
||||
bp["hl"],
|
||||
bp["n_fft"],
|
||||
mp.param["mid_side"],
|
||||
mp.param["mid_side_b2"],
|
||||
mp.param["reverse"],
|
||||
)
|
||||
|
||||
del X_wave, y_wave
|
||||
|
||||
X_spec_m = combine_spectrograms(X_spec_s, mp)
|
||||
y_spec_m = combine_spectrograms(y_spec_s, mp)
|
||||
|
||||
if X_spec_m.shape != y_spec_m.shape:
|
||||
raise ValueError("The combined spectrograms are different: " + mix_path)
|
||||
|
||||
_, ext = os.path.splitext(mix_path)
|
||||
|
||||
np.save(mix_cache_path, X_spec_m)
|
||||
np.save(inst_cache_path, y_spec_m)
|
||||
|
||||
return X_spec_m, y_spec_m
|
||||
|
||||
def run_librosa_istft(specx, hop_length):
|
||||
return librosa.istft(np.asfortranarray(specx), hop_length=hop_length)
|
||||
|
||||
def spectrogram_to_wave(spec, hop_length, mid_side, mid_side_b2, reverse):
|
||||
spec_left = np.asfortranarray(spec[0])
|
||||
spec_right = np.asfortranarray(spec[1])
|
||||
|
||||
wave_left = librosa.istft(spec_left, hop_length=hop_length)
|
||||
wave_right = librosa.istft(spec_right, hop_length=hop_length)
|
||||
|
||||
if reverse:
|
||||
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
|
||||
elif mid_side:
|
||||
return np.asfortranarray(
|
||||
[np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)]
|
||||
)
|
||||
elif mid_side_b2:
|
||||
return np.asfortranarray(
|
||||
[
|
||||
np.add(wave_right / 1.25, 0.4 * wave_left),
|
||||
np.subtract(wave_left / 1.25, 0.4 * wave_right),
|
||||
]
|
||||
)
|
||||
else:
|
||||
return np.asfortranarray([wave_left, wave_right])
|
||||
|
||||
|
||||
def spectrogram_to_wave_mt(spec, hop_length, mid_side, reverse, mid_side_b2):
|
||||
import threading
|
||||
|
||||
spec_left = np.asfortranarray(spec[0])
|
||||
spec_right = np.asfortranarray(spec[1])
|
||||
|
||||
def run_thread(**kwargs):
|
||||
global wave_left
|
||||
wave_left = librosa.istft(**kwargs)
|
||||
|
||||
thread = threading.Thread(
|
||||
target=run_thread, kwargs={"stft_matrix": spec_left, "hop_length": hop_length}
|
||||
)
|
||||
thread.start()
|
||||
wave_right = librosa.istft(spec_right, hop_length=hop_length)
|
||||
thread.join()
|
||||
with ThreadPoolExecutor(max_workers=2) as tp:
|
||||
wave_left, wave_right = tp.map(run_librosa_istft, spec, [hop_length, hop_length])
|
||||
|
||||
if reverse:
|
||||
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
|
||||
@@ -349,7 +167,6 @@ def spectrogram_to_wave_mt(spec, hop_length, mid_side, reverse, mid_side_b2):
|
||||
|
||||
|
||||
def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None):
|
||||
wave_band = {}
|
||||
bands_n = len(mp.param["band"])
|
||||
offset = 0
|
||||
|
||||
@@ -428,6 +245,7 @@ def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None):
|
||||
return wave.T
|
||||
|
||||
|
||||
@jit(nopython=True)
|
||||
def fft_lp_filter(spec, bin_start, bin_stop):
|
||||
g = 1.0
|
||||
for b in range(bin_start, bin_stop):
|
||||
@@ -439,6 +257,7 @@ def fft_lp_filter(spec, bin_start, bin_stop):
|
||||
return spec
|
||||
|
||||
|
||||
@jit(nopython=True)
|
||||
def fft_hp_filter(spec, bin_start, bin_stop):
|
||||
g = 1.0
|
||||
for b in range(bin_start, bin_stop, -1):
|
||||
@@ -450,15 +269,15 @@ def fft_hp_filter(spec, bin_start, bin_stop):
|
||||
return spec
|
||||
|
||||
|
||||
def mirroring(a, spec_m, input_high_end, mp):
|
||||
def mirroring(a, spec_m, input_high_end, pre_filter_start):
|
||||
if "mirroring" == a:
|
||||
mirror = np.flip(
|
||||
np.abs(
|
||||
spec_m[
|
||||
:,
|
||||
mp.param["pre_filter_start"]
|
||||
pre_filter_start
|
||||
- 10
|
||||
- input_high_end.shape[1] : mp.param["pre_filter_start"]
|
||||
- input_high_end.shape[1] : pre_filter_start
|
||||
- 10,
|
||||
:,
|
||||
]
|
||||
@@ -476,9 +295,9 @@ def mirroring(a, spec_m, input_high_end, mp):
|
||||
np.abs(
|
||||
spec_m[
|
||||
:,
|
||||
mp.param["pre_filter_start"]
|
||||
pre_filter_start
|
||||
- 10
|
||||
- input_high_end.shape[1] : mp.param["pre_filter_start"]
|
||||
- input_high_end.shape[1] : pre_filter_start
|
||||
- 10,
|
||||
:,
|
||||
]
|
||||
@@ -488,39 +307,3 @@ def mirroring(a, spec_m, input_high_end, mp):
|
||||
mi = np.multiply(mirror, input_high_end * 1.7)
|
||||
|
||||
return np.where(np.abs(input_high_end) <= np.abs(mi), input_high_end, mi)
|
||||
|
||||
|
||||
def ensembling(a, specs):
|
||||
for i in range(1, len(specs)):
|
||||
if i == 1:
|
||||
spec = specs[0]
|
||||
|
||||
ln = min([spec.shape[2], specs[i].shape[2]])
|
||||
spec = spec[:, :, :ln]
|
||||
specs[i] = specs[i][:, :, :ln]
|
||||
|
||||
if "min_mag" == a:
|
||||
spec = np.where(np.abs(specs[i]) <= np.abs(spec), specs[i], spec)
|
||||
if "max_mag" == a:
|
||||
spec = np.where(np.abs(specs[i]) >= np.abs(spec), specs[i], spec)
|
||||
|
||||
return spec
|
||||
|
||||
|
||||
def stft(wave, nfft, hl):
|
||||
wave_left = np.asfortranarray(wave[0])
|
||||
wave_right = np.asfortranarray(wave[1])
|
||||
spec_left = librosa.stft(wave_left, n_fft=nfft, hop_length=hl)
|
||||
spec_right = librosa.stft(wave_right, n_fft=nfft, hop_length=hl)
|
||||
spec = np.asfortranarray([spec_left, spec_right])
|
||||
|
||||
return spec
|
||||
|
||||
|
||||
def istft(spec, hl):
|
||||
spec_left = np.asfortranarray(spec[0])
|
||||
spec_right = np.asfortranarray(spec[1])
|
||||
|
||||
wave_left = librosa.istft(spec_left, hop_length=hl)
|
||||
wave_right = librosa.istft(spec_right, hop_length=hl)
|
||||
wave = np.asfortranarray([wave_left, wave_right])
|
||||
|
||||
@@ -3,12 +3,11 @@ import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
import torch
|
||||
from tqdm import tqdm
|
||||
|
||||
from infer.lib.audio import downsample_audio, save_audio
|
||||
from infer.lib.audio import load_audio, save_audio
|
||||
|
||||
cpu = torch.device("cpu")
|
||||
|
||||
@@ -201,29 +200,18 @@ class Predictor:
|
||||
os.makedirs(vocal_root, exist_ok=True)
|
||||
os.makedirs(others_root, exist_ok=True)
|
||||
basename = os.path.basename(m)
|
||||
mix, rate = librosa.load(m, mono=False, sr=44100)
|
||||
mix, rate = load_audio(m, mono=False, sr=44100)
|
||||
if mix.ndim == 1:
|
||||
mix = np.asfortranarray([mix, mix])
|
||||
mix = mix.T
|
||||
sources = self.demix(mix.T)
|
||||
opt = sources[0].T
|
||||
if format in ["wav", "flac"]:
|
||||
save_audio(
|
||||
"%s/vocal_%s.%s" % (vocal_root, basename, format), mix - opt, rate
|
||||
)
|
||||
save_audio(
|
||||
"%s/instrument_%s.%s" % (others_root, basename, format), opt, rate
|
||||
)
|
||||
else:
|
||||
path_vocal = "%s/vocal_%s.wav" % (vocal_root, basename)
|
||||
path_other = "%s/instrument_%s.wav" % (others_root, basename)
|
||||
save_audio(path_vocal, opt, rate)
|
||||
save_audio(path_other, opt, rate)
|
||||
opt_path_vocal = path_vocal[:-4] + ".%s" % format
|
||||
opt_path_other = path_other[:-4] + ".%s" % format
|
||||
downsample_audio(path_vocal, opt_path_vocal, format)
|
||||
downsample_audio(path_other, opt_path_other, format)
|
||||
|
||||
save_audio(
|
||||
"%s/vocal_%s.%s" % (vocal_root, basename, format), mix - opt, rate, True, format=format,
|
||||
)
|
||||
save_audio(
|
||||
"%s/instrument_%s.%s" % (others_root, basename, format), opt, rate, True, format=format,
|
||||
)
|
||||
|
||||
class MDXNetDereverb:
|
||||
def __init__(self, chunks, device):
|
||||
|
||||
@@ -55,13 +55,17 @@ def uvr(model_name, inp_root, save_root_vocal, paths, save_root_ins, agg, format
|
||||
done = 1
|
||||
except Exception as e:
|
||||
need_reformat = 1
|
||||
print(f"Exception {e} occured. Will reformat")
|
||||
logger.warning(f"Exception {e} occured. Will reformat")
|
||||
if need_reformat == 1:
|
||||
tmp_path = "%s/%s.reformatted.wav" % (
|
||||
os.path.join(os.environ["TEMP"]),
|
||||
os.path.basename(inp_path),
|
||||
)
|
||||
resample_audio(inp_path, tmp_path, "pcm_s16le", "s16", 44100, "stereo")
|
||||
try: # Remove the original file
|
||||
os.remove(inp_path)
|
||||
except Exception as e:
|
||||
print(f"Failed to remove the original file: {e}")
|
||||
inp_path = tmp_path
|
||||
try:
|
||||
if done == 0:
|
||||
|
||||
@@ -5,7 +5,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
from infer.lib.audio import downsample_audio, save_audio
|
||||
from infer.lib.audio import save_audio
|
||||
import torch
|
||||
|
||||
from infer.lib.uvr5_pack.lib_v5 import nets_123821KB as Nets
|
||||
@@ -119,7 +119,7 @@ class AudioPre:
|
||||
if ins_root is not None:
|
||||
if self.data["high_end_process"].startswith("mirroring"):
|
||||
input_high_end_ = spec_utils.mirroring(
|
||||
self.data["high_end_process"], y_spec_m, input_high_end, self.mp
|
||||
self.data["high_end_process"], y_spec_m, input_high_end, self.mp.param["pre_filter_start"]
|
||||
)
|
||||
wav_instrument = spec_utils.cmb_spectrogram_to_wave(
|
||||
y_spec_m, self.mp, input_high_end_h, input_high_end_
|
||||
@@ -131,23 +131,16 @@ class AudioPre:
|
||||
head = "vocal_"
|
||||
else:
|
||||
head = "instrument_"
|
||||
if format in ["wav", "flac"]:
|
||||
save_audio(
|
||||
os.path.join(
|
||||
ins_root,
|
||||
head + "{}_{}.{}".format(name, self.data["agg"], format),
|
||||
),
|
||||
wav_instrument,
|
||||
self.mp.param["sr"],
|
||||
)
|
||||
else:
|
||||
path = os.path.join(
|
||||
ins_root, head + "{}_{}.wav".format(name, self.data["agg"])
|
||||
)
|
||||
save_audio(path, wav_instrument, self.mp.param["sr"])
|
||||
if os.path.exists(path):
|
||||
opt_format_path = path[:-4] + ".%s" % format
|
||||
downsample_audio(path, opt_format_path, format)
|
||||
save_audio(
|
||||
os.path.join(
|
||||
ins_root,
|
||||
head + "{}_{}.{}".format(name, self.data["agg"], format),
|
||||
),
|
||||
wav_instrument,
|
||||
self.mp.param["sr"],
|
||||
f32=True,
|
||||
format=format
|
||||
)
|
||||
if vocal_root is not None:
|
||||
if self.is_reverse:
|
||||
head = "instrument_"
|
||||
@@ -155,7 +148,7 @@ class AudioPre:
|
||||
head = "vocal_"
|
||||
if self.data["high_end_process"].startswith("mirroring"):
|
||||
input_high_end_ = spec_utils.mirroring(
|
||||
self.data["high_end_process"], v_spec_m, input_high_end, self.mp
|
||||
self.data["high_end_process"], v_spec_m, input_high_end, self.mp.param["pre_filter_start"]
|
||||
)
|
||||
wav_vocals = spec_utils.cmb_spectrogram_to_wave(
|
||||
v_spec_m, self.mp, input_high_end_h, input_high_end_
|
||||
@@ -163,20 +156,13 @@ class AudioPre:
|
||||
else:
|
||||
wav_vocals = spec_utils.cmb_spectrogram_to_wave(v_spec_m, self.mp)
|
||||
logger.info("%s vocals done" % name)
|
||||
if format in ["wav", "flac"]:
|
||||
save_audio(
|
||||
os.path.join(
|
||||
vocal_root,
|
||||
head + "{}_{}.{}".format(name, self.data["agg"], format),
|
||||
),
|
||||
wav_vocals,
|
||||
self.mp.param["sr"],
|
||||
)
|
||||
else:
|
||||
path = os.path.join(
|
||||
vocal_root, head + "{}_{}.wav".format(name, self.data["agg"])
|
||||
)
|
||||
save_audio(path, wav_vocals, self.mp.param["sr"])
|
||||
if os.path.exists(path):
|
||||
opt_format_path = path[:-4] + ".%s" % format
|
||||
downsample_audio(path, opt_format_path, format)
|
||||
save_audio(
|
||||
os.path.join(
|
||||
vocal_root,
|
||||
head + "{}_{}.{}".format(name, self.data["agg"], format),
|
||||
),
|
||||
wav_vocals,
|
||||
self.mp.param["sr"],
|
||||
f32=True,
|
||||
format=format
|
||||
)
|
||||
|
||||
@@ -251,25 +251,13 @@ class VC:
|
||||
if "Success" in info:
|
||||
try:
|
||||
tgt_sr, audio_opt = opt
|
||||
if format1 in ["wav", "flac"]:
|
||||
save_audio(
|
||||
"%s/%s.%s"
|
||||
% (opt_root, os.path.basename(path), format1),
|
||||
audio_opt,
|
||||
tgt_sr,
|
||||
)
|
||||
else:
|
||||
path = "%s/%s.%s" % (
|
||||
opt_root,
|
||||
os.path.basename(path),
|
||||
format1,
|
||||
)
|
||||
with open(path, "wb") as outf:
|
||||
wav2(
|
||||
float_np_array_to_wav_buf(audio_opt, tgt_sr),
|
||||
outf,
|
||||
format1,
|
||||
)
|
||||
save_audio(
|
||||
"%s/%s.%s"
|
||||
% (opt_root, os.path.basename(path), format1),
|
||||
audio_opt,
|
||||
tgt_sr,
|
||||
f32=True,
|
||||
)
|
||||
except:
|
||||
info += traceback.format_exc()
|
||||
infos.append("%s->%s" % (os.path.basename(path), info))
|
||||
|
||||
2
web.py
2
web.py
@@ -671,6 +671,7 @@ def train1key(
|
||||
if_save_latest13,
|
||||
pretrained_G14,
|
||||
pretrained_D15,
|
||||
gpus16,
|
||||
if_cache_gpu17,
|
||||
if_save_every_weights18,
|
||||
version19,
|
||||
@@ -1360,6 +1361,7 @@ with gr.Blocks(title="RVC WebUI") as app:
|
||||
if_save_latest13,
|
||||
pretrained_G14,
|
||||
pretrained_D15,
|
||||
gpus16,
|
||||
if_cache_gpu17,
|
||||
if_save_every_weights18,
|
||||
version19,
|
||||
|
||||
Reference in New Issue
Block a user