diff --git a/rvc/onnx/f0predictor/__init__.py b/rvc/onnx/f0predictor/__init__.py index 949f858..71d4522 100644 --- a/rvc/onnx/f0predictor/__init__.py +++ b/rvc/onnx/f0predictor/__init__.py @@ -1,3 +1,4 @@ from .dio import DioF0Predictor from .harvest import HarvestF0Predictor from .pm import PMF0Predictor +from .f0 import F0Predictor diff --git a/rvc/onnx/f0predictor/dio.py b/rvc/onnx/f0predictor/dio.py index 449ab4e..1cd2af1 100644 --- a/rvc/onnx/f0predictor/dio.py +++ b/rvc/onnx/f0predictor/dio.py @@ -1,66 +1,15 @@ import numpy as np import pyworld +import typing from .f0 import F0Predictor class DioF0Predictor(F0Predictor): def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): - self.hop_length = hop_length - self.f0_min = f0_min - self.f0_max = f0_max - self.sampling_rate = sampling_rate + super().__init__(hop_length, f0_min, f0_max, sampling_rate) - def interpolate_f0(self, f0): - """ - 对F0进行插值处理 - """ - - data = np.reshape(f0, (f0.size, 1)) - - vuv_vector = np.zeros((data.size, 1), dtype=np.float32) - vuv_vector[data > 0.0] = 1.0 - vuv_vector[data <= 0.0] = 0.0 - - ip_data = data - - frame_number = data.size - last_value = 0.0 - for i in range(frame_number): - if data[i] <= 0.0: - j = i + 1 - for j in range(i + 1, frame_number): - if data[j] > 0.0: - break - if j < frame_number - 1: - if last_value > 0.0: - step = (data[j] - data[i - 1]) / float(j - i) - for k in range(i, j): - ip_data[k] = data[i - 1] + step * (k - i + 1) - else: - for k in range(i, j): - ip_data[k] = data[j] - else: - for k in range(i, frame_number): - ip_data[k] = last_value - else: - ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 - last_value = data[i] - - return ip_data[:, 0], vuv_vector[:, 0] - - def resize_f0(self, x, target_len): - source = np.array(x) - source[source < 0.001] = np.nan - target = np.interp( - np.arange(0, len(source) * target_len, len(source)) / target_len, - np.arange(0, len(source)), - source, - ) - res = np.nan_to_num(target) - return res - - def compute_f0(self, wav, p_len=None): + def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): if p_len is None: p_len = wav.shape[0] // self.hop_length f0, t = pyworld.dio( @@ -73,9 +22,9 @@ class DioF0Predictor(F0Predictor): f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) for index, pitch in enumerate(f0): f0[index] = round(pitch, 1) - return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0] - def compute_f0_uv(self, wav, p_len=None): + def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): if p_len is None: p_len = wav.shape[0] // self.hop_length f0, t = pyworld.dio( @@ -88,4 +37,4 @@ class DioF0Predictor(F0Predictor): f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) for index, pitch in enumerate(f0): f0[index] = round(pitch, 1) - return self.interpolate_f0(self.resize_f0(f0, p_len)) + return self.__interpolate_f0(self.__resize_f0(f0, p_len)) diff --git a/rvc/onnx/f0predictor/f0.py b/rvc/onnx/f0predictor/f0.py index 0d81b05..9c4a0ee 100644 --- a/rvc/onnx/f0predictor/f0.py +++ b/rvc/onnx/f0predictor/f0.py @@ -1,16 +1,62 @@ -class F0Predictor(object): - def compute_f0(self, wav, p_len): - """ - input: wav:[signal_length] - p_len:int - output: f0:[signal_length//hop_length] - """ - pass +import numpy as np +import typing - def compute_f0_uv(self, wav, p_len): +class F0Predictor(object): + def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): ... + + def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): ... + + def __interpolate_f0(self, f0: np.ndarray[typing.Any, np.dtype]): """ - input: wav:[signal_length] - p_len:int - output: f0:[signal_length//hop_length],uv:[signal_length//hop_length] + 对F0进行插值处理 """ - pass + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:, 0], vuv_vector[:, 0] + + def __resize_f0(self, x: np.ndarray[typing.Any, np.dtype], target_len: int): + source = np.array(x) + source[source < 0.001] = np.nan + target = np.interp( + np.arange(0, len(source) * target_len, len(source)) / target_len, + np.arange(0, len(source)), + source, + ) + res = np.nan_to_num(target) + return res diff --git a/rvc/onnx/f0predictor/harvest.py b/rvc/onnx/f0predictor/harvest.py index b80c606..77759bd 100644 --- a/rvc/onnx/f0predictor/harvest.py +++ b/rvc/onnx/f0predictor/harvest.py @@ -1,66 +1,15 @@ import numpy as np import pyworld +import typing from .f0 import F0Predictor class HarvestF0Predictor(F0Predictor): def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): - self.hop_length = hop_length - self.f0_min = f0_min - self.f0_max = f0_max - self.sampling_rate = sampling_rate + super().__init__(hop_length, f0_min, f0_max, sampling_rate) - def interpolate_f0(self, f0): - """ - 对F0进行插值处理 - """ - - data = np.reshape(f0, (f0.size, 1)) - - vuv_vector = np.zeros((data.size, 1), dtype=np.float32) - vuv_vector[data > 0.0] = 1.0 - vuv_vector[data <= 0.0] = 0.0 - - ip_data = data - - frame_number = data.size - last_value = 0.0 - for i in range(frame_number): - if data[i] <= 0.0: - j = i + 1 - for j in range(i + 1, frame_number): - if data[j] > 0.0: - break - if j < frame_number - 1: - if last_value > 0.0: - step = (data[j] - data[i - 1]) / float(j - i) - for k in range(i, j): - ip_data[k] = data[i - 1] + step * (k - i + 1) - else: - for k in range(i, j): - ip_data[k] = data[j] - else: - for k in range(i, frame_number): - ip_data[k] = last_value - else: - ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 - last_value = data[i] - - return ip_data[:, 0], vuv_vector[:, 0] - - def resize_f0(self, x, target_len): - source = np.array(x) - source[source < 0.001] = np.nan - target = np.interp( - np.arange(0, len(source) * target_len, len(source)) / target_len, - np.arange(0, len(source)), - source, - ) - res = np.nan_to_num(target) - return res - - def compute_f0(self, wav, p_len=None): + def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): if p_len is None: p_len = wav.shape[0] // self.hop_length f0, t = pyworld.harvest( @@ -71,9 +20,9 @@ class HarvestF0Predictor(F0Predictor): frame_period=1000 * self.hop_length / self.sampling_rate, ) f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.fs) - return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0] - def compute_f0_uv(self, wav, p_len=None): + def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): if p_len is None: p_len = wav.shape[0] // self.hop_length f0, t = pyworld.harvest( @@ -84,4 +33,4 @@ class HarvestF0Predictor(F0Predictor): frame_period=1000 * self.hop_length / self.sampling_rate, ) f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) - return self.interpolate_f0(self.resize_f0(f0, p_len)) + return self.__interpolate_f0(self.__resize_f0(f0, p_len)) diff --git a/rvc/onnx/f0predictor/pm.py b/rvc/onnx/f0predictor/pm.py index 915e606..e37216e 100644 --- a/rvc/onnx/f0predictor/pm.py +++ b/rvc/onnx/f0predictor/pm.py @@ -1,55 +1,15 @@ import numpy as np import parselmouth +import typing from .f0 import F0Predictor class PMF0Predictor(F0Predictor): def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100): - self.hop_length = hop_length - self.f0_min = f0_min - self.f0_max = f0_max - self.sampling_rate = sampling_rate + super().__init__(hop_length, f0_min, f0_max, sampling_rate) - def interpolate_f0(self, f0): - """ - 对F0进行插值处理 - """ - - data = np.reshape(f0, (f0.size, 1)) - - vuv_vector = np.zeros((data.size, 1), dtype=np.float32) - vuv_vector[data > 0.0] = 1.0 - vuv_vector[data <= 0.0] = 0.0 - - ip_data = data - - frame_number = data.size - last_value = 0.0 - for i in range(frame_number): - if data[i] <= 0.0: - j = i + 1 - for j in range(i + 1, frame_number): - if data[j] > 0.0: - break - if j < frame_number - 1: - if last_value > 0.0: - step = (data[j] - data[i - 1]) / float(j - i) - for k in range(i, j): - ip_data[k] = data[i - 1] + step * (k - i + 1) - else: - for k in range(i, j): - ip_data[k] = data[j] - else: - for k in range(i, frame_number): - ip_data[k] = last_value - else: - ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝 - last_value = data[i] - - return ip_data[:, 0], vuv_vector[:, 0] - - def compute_f0(self, wav, p_len=None): + def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): x = wav if p_len is None: p_len = x.shape[0] // self.hop_length @@ -70,10 +30,10 @@ class PMF0Predictor(F0Predictor): pad_size = (p_len - len(f0) + 1) // 2 if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") - f0, uv = self.interpolate_f0(f0) + f0, uv = self.__interpolate_f0(f0) return f0 - def compute_f0_uv(self, wav, p_len=None): + def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): x = wav if p_len is None: p_len = x.shape[0] // self.hop_length @@ -94,5 +54,5 @@ class PMF0Predictor(F0Predictor): pad_size = (p_len - len(f0) + 1) // 2 if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") - f0, uv = self.interpolate_f0(f0) + f0, uv = self.__interpolate_f0(f0) return f0, uv diff --git a/rvc/onnx/infer.py b/rvc/onnx/infer.py index 68f6f12..258f85d 100644 --- a/rvc/onnx/infer.py +++ b/rvc/onnx/infer.py @@ -1,15 +1,15 @@ import librosa import numpy as np import onnxruntime +import typing +import os -from onnx.f0predictor import PMF0Predictor -from onnx.f0predictor import HarvestF0Predictor -from onnx.f0predictor import DioF0Predictor +from onnx.f0predictor import PMF0Predictor, HarvestF0Predictor, DioF0Predictor, F0Predictor -class ContentVec: - def __init__(self, vec_path: str, device=None): - if device == "cpu" or device is None: +class Model: + def __init__(self, path: str | bytes | os.PathLike, device: typing.Literal["cpu", "cuda", "dml"]="cpu"): + if device == "cpu": providers = ["CPUExecutionProvider"] elif device == "cuda": providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] @@ -17,12 +17,16 @@ class ContentVec: providers = ["DmlExecutionProvider"] else: raise RuntimeError("Unsportted Device") - self.model = onnxruntime.InferenceSession(vec_path, providers=providers) + self.model = onnxruntime.InferenceSession(path, providers=providers) - def __call__(self, wav): +class ContentVec(Model): + def __init__(self, vec_path: str | bytes | os.PathLike, device: typing.Literal["cpu", "cuda", "dml"]="cpu"): + super().__init__(vec_path, device) + + def __call__(self, wav: np.ndarray[typing.Any, np.dtype]): return self.forward(wav) - def forward(self, wav): + def forward(self, wav: np.ndarray[typing.Any, np.dtype]): if wav.ndim == 2: # double channels wav = wav.mean(-1) assert wav.ndim == 1, wav.ndim @@ -32,58 +36,39 @@ class ContentVec: return logits.transpose(0, 2, 1) -predicters = { +predictors: typing.Dict[str, F0Predictor] = { "pm": PMF0Predictor, "harvest": HarvestF0Predictor, "dio": DioF0Predictor, } -def get_f0_predictor(f0_method, hop_length, sampling_rate): - return predicters[f0_method](hop_length=hop_length, sampling_rate=sampling_rate) +def get_f0_predictor(f0_method: str, hop_length: int, sampling_rate: int) -> F0Predictor: + return predictors[f0_method](hop_length=hop_length, sampling_rate=sampling_rate) -class RVC: +class RVC(Model): def __init__( self, - model_path, + model_path: str | bytes | os.PathLike, sr=40000, hop_size=512, - vec_path="vec-768-layer-12.onnx", - device="cpu", + vec_path: str | bytes | os.PathLike = "vec-768-layer-12.onnx", + device: typing.Literal["cpu", "cuda", "dml"] = "cpu", ): + super().__init__(model_path, device) self.vec_model = ContentVec(vec_path, device) - if device == "cpu" or device is None: - providers = ["CPUExecutionProvider"] - elif device == "cuda": - providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] - elif device == "dml": - providers = ["DmlExecutionProvider"] - else: - raise RuntimeError("Unsportted Device") - self.model = onnxruntime.InferenceSession(model_path, providers=providers) self.sampling_rate = sr self.hop_size = hop_size - def forward(self, hubert, hubert_length, pitch, pitchf, ds, rnd): - onnx_input = { - self.model.get_inputs()[0].name: hubert, - self.model.get_inputs()[1].name: hubert_length, - self.model.get_inputs()[2].name: pitch, - self.model.get_inputs()[3].name: pitchf, - self.model.get_inputs()[4].name: ds, - self.model.get_inputs()[5].name: rnd, - } - return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16) - def inference( self, - wav, - sr, - sid, + wav: np.ndarray[typing.Any, np.dtype], + sr: int, + sid: int, f0_method="dio", f0_up_key=0, - ): + ) -> np.ndarray[typing.Any, np.dtype[np.int16]]: f0_min = 50 f0_max = 1100 f0_mel_min = 1127 * np.log(1 + f0_min / 700) @@ -122,6 +107,25 @@ class RVC: rnd = np.random.randn(1, 192, hubert_length).astype(np.float32) hubert_length = np.array([hubert_length]).astype(np.int64) - out_wav = self.forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze() + out_wav = self.__forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze() out_wav = np.pad(out_wav, (0, 2 * self.hop_size), "constant") return out_wav[0:org_length] + + def __forward( + self, + hubert: np.ndarray[typing.Any, np.dtype[np.float32]], + hubert_length: int, + pitch: np.ndarray[typing.Any, np.dtype[np.int64]], + pitchf: np.ndarray[typing.Any, np.dtype[np.float32]], + ds: np.ndarray[typing.Any, np.dtype[np.int64]], + rnd: np.ndarray[typing.Any, np.dtype[np.float32]], + ) -> np.ndarray[typing.Any, np.dtype[np.int16]]: + onnx_input = { + self.model.get_inputs()[0].name: hubert, + self.model.get_inputs()[1].name: hubert_length, + self.model.get_inputs()[2].name: pitch, + self.model.get_inputs()[3].name: pitchf, + self.model.get_inputs()[4].name: ds, + self.model.get_inputs()[5].name: rnd, + } + return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16)