1
0
mirror of https://github.com/fumiama/Retrieval-based-Voice-Conversion-WebUI.git synced 2026-06-05 01:10:22 +08:00

optimize(rvc.onnx): add types defs

This commit is contained in:
源文雨
2024-06-06 01:01:59 +09:00
parent f60bebe89c
commit 6e8feb9028
6 changed files with 123 additions and 214 deletions

View File

@@ -1,3 +1,4 @@
from .dio import DioF0Predictor
from .harvest import HarvestF0Predictor
from .pm import PMF0Predictor
from .f0 import F0Predictor

View File

@@ -1,66 +1,15 @@
import numpy as np
import pyworld
import typing
from .f0 import F0Predictor
class DioF0Predictor(F0Predictor):
def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100):
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.sampling_rate = sampling_rate
super().__init__(hop_length, f0_min, f0_max, sampling_rate)
def interpolate_f0(self, f0):
"""
对F0进行插值处理
"""
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:, 0], vuv_vector[:, 0]
def resize_f0(self, x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * target_len, len(source)) / target_len,
np.arange(0, len(source)),
source,
)
res = np.nan_to_num(target)
return res
def compute_f0(self, wav, p_len=None):
def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
if p_len is None:
p_len = wav.shape[0] // self.hop_length
f0, t = pyworld.dio(
@@ -73,9 +22,9 @@ class DioF0Predictor(F0Predictor):
f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate)
for index, pitch in enumerate(f0):
f0[index] = round(pitch, 1)
return self.interpolate_f0(self.resize_f0(f0, p_len))[0]
return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0]
def compute_f0_uv(self, wav, p_len=None):
def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
if p_len is None:
p_len = wav.shape[0] // self.hop_length
f0, t = pyworld.dio(
@@ -88,4 +37,4 @@ class DioF0Predictor(F0Predictor):
f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate)
for index, pitch in enumerate(f0):
f0[index] = round(pitch, 1)
return self.interpolate_f0(self.resize_f0(f0, p_len))
return self.__interpolate_f0(self.__resize_f0(f0, p_len))

View File

@@ -1,16 +1,62 @@
class F0Predictor(object):
def compute_f0(self, wav, p_len):
"""
input: wav:[signal_length]
p_len:int
output: f0:[signal_length//hop_length]
"""
pass
import numpy as np
import typing
def compute_f0_uv(self, wav, p_len):
class F0Predictor(object):
def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100):
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.sampling_rate = sampling_rate
def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): ...
def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None): ...
def __interpolate_f0(self, f0: np.ndarray[typing.Any, np.dtype]):
"""
input: wav:[signal_length]
p_len:int
output: f0:[signal_length//hop_length],uv:[signal_length//hop_length]
对F0进行插值处理
"""
pass
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:, 0], vuv_vector[:, 0]
def __resize_f0(self, x: np.ndarray[typing.Any, np.dtype], target_len: int):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * target_len, len(source)) / target_len,
np.arange(0, len(source)),
source,
)
res = np.nan_to_num(target)
return res

View File

@@ -1,66 +1,15 @@
import numpy as np
import pyworld
import typing
from .f0 import F0Predictor
class HarvestF0Predictor(F0Predictor):
def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100):
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.sampling_rate = sampling_rate
super().__init__(hop_length, f0_min, f0_max, sampling_rate)
def interpolate_f0(self, f0):
"""
对F0进行插值处理
"""
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:, 0], vuv_vector[:, 0]
def resize_f0(self, x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * target_len, len(source)) / target_len,
np.arange(0, len(source)),
source,
)
res = np.nan_to_num(target)
return res
def compute_f0(self, wav, p_len=None):
def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
if p_len is None:
p_len = wav.shape[0] // self.hop_length
f0, t = pyworld.harvest(
@@ -71,9 +20,9 @@ class HarvestF0Predictor(F0Predictor):
frame_period=1000 * self.hop_length / self.sampling_rate,
)
f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.fs)
return self.interpolate_f0(self.resize_f0(f0, p_len))[0]
return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0]
def compute_f0_uv(self, wav, p_len=None):
def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
if p_len is None:
p_len = wav.shape[0] // self.hop_length
f0, t = pyworld.harvest(
@@ -84,4 +33,4 @@ class HarvestF0Predictor(F0Predictor):
frame_period=1000 * self.hop_length / self.sampling_rate,
)
f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate)
return self.interpolate_f0(self.resize_f0(f0, p_len))
return self.__interpolate_f0(self.__resize_f0(f0, p_len))

View File

@@ -1,55 +1,15 @@
import numpy as np
import parselmouth
import typing
from .f0 import F0Predictor
class PMF0Predictor(F0Predictor):
def __init__(self, hop_length=512, f0_min=50, f0_max=1100, sampling_rate=44100):
self.hop_length = hop_length
self.f0_min = f0_min
self.f0_max = f0_max
self.sampling_rate = sampling_rate
super().__init__(hop_length, f0_min, f0_max, sampling_rate)
def interpolate_f0(self, f0):
"""
对F0进行插值处理
"""
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] # 这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:, 0], vuv_vector[:, 0]
def compute_f0(self, wav, p_len=None):
def compute_f0(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
x = wav
if p_len is None:
p_len = x.shape[0] // self.hop_length
@@ -70,10 +30,10 @@ class PMF0Predictor(F0Predictor):
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant")
f0, uv = self.interpolate_f0(f0)
f0, uv = self.__interpolate_f0(f0)
return f0
def compute_f0_uv(self, wav, p_len=None):
def compute_f0_uv(self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None):
x = wav
if p_len is None:
p_len = x.shape[0] // self.hop_length
@@ -94,5 +54,5 @@ class PMF0Predictor(F0Predictor):
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0:
f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant")
f0, uv = self.interpolate_f0(f0)
f0, uv = self.__interpolate_f0(f0)
return f0, uv

View File

@@ -1,15 +1,15 @@
import librosa
import numpy as np
import onnxruntime
import typing
import os
from onnx.f0predictor import PMF0Predictor
from onnx.f0predictor import HarvestF0Predictor
from onnx.f0predictor import DioF0Predictor
from onnx.f0predictor import PMF0Predictor, HarvestF0Predictor, DioF0Predictor, F0Predictor
class ContentVec:
def __init__(self, vec_path: str, device=None):
if device == "cpu" or device is None:
class Model:
def __init__(self, path: str | bytes | os.PathLike, device: typing.Literal["cpu", "cuda", "dml"]="cpu"):
if device == "cpu":
providers = ["CPUExecutionProvider"]
elif device == "cuda":
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
@@ -17,12 +17,16 @@ class ContentVec:
providers = ["DmlExecutionProvider"]
else:
raise RuntimeError("Unsportted Device")
self.model = onnxruntime.InferenceSession(vec_path, providers=providers)
self.model = onnxruntime.InferenceSession(path, providers=providers)
def __call__(self, wav):
class ContentVec(Model):
def __init__(self, vec_path: str | bytes | os.PathLike, device: typing.Literal["cpu", "cuda", "dml"]="cpu"):
super().__init__(vec_path, device)
def __call__(self, wav: np.ndarray[typing.Any, np.dtype]):
return self.forward(wav)
def forward(self, wav):
def forward(self, wav: np.ndarray[typing.Any, np.dtype]):
if wav.ndim == 2: # double channels
wav = wav.mean(-1)
assert wav.ndim == 1, wav.ndim
@@ -32,58 +36,39 @@ class ContentVec:
return logits.transpose(0, 2, 1)
predicters = {
predictors: typing.Dict[str, F0Predictor] = {
"pm": PMF0Predictor,
"harvest": HarvestF0Predictor,
"dio": DioF0Predictor,
}
def get_f0_predictor(f0_method, hop_length, sampling_rate):
return predicters[f0_method](hop_length=hop_length, sampling_rate=sampling_rate)
def get_f0_predictor(f0_method: str, hop_length: int, sampling_rate: int) -> F0Predictor:
return predictors[f0_method](hop_length=hop_length, sampling_rate=sampling_rate)
class RVC:
class RVC(Model):
def __init__(
self,
model_path,
model_path: str | bytes | os.PathLike,
sr=40000,
hop_size=512,
vec_path="vec-768-layer-12.onnx",
device="cpu",
vec_path: str | bytes | os.PathLike = "vec-768-layer-12.onnx",
device: typing.Literal["cpu", "cuda", "dml"] = "cpu",
):
super().__init__(model_path, device)
self.vec_model = ContentVec(vec_path, device)
if device == "cpu" or device is None:
providers = ["CPUExecutionProvider"]
elif device == "cuda":
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
elif device == "dml":
providers = ["DmlExecutionProvider"]
else:
raise RuntimeError("Unsportted Device")
self.model = onnxruntime.InferenceSession(model_path, providers=providers)
self.sampling_rate = sr
self.hop_size = hop_size
def forward(self, hubert, hubert_length, pitch, pitchf, ds, rnd):
onnx_input = {
self.model.get_inputs()[0].name: hubert,
self.model.get_inputs()[1].name: hubert_length,
self.model.get_inputs()[2].name: pitch,
self.model.get_inputs()[3].name: pitchf,
self.model.get_inputs()[4].name: ds,
self.model.get_inputs()[5].name: rnd,
}
return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16)
def inference(
self,
wav,
sr,
sid,
wav: np.ndarray[typing.Any, np.dtype],
sr: int,
sid: int,
f0_method="dio",
f0_up_key=0,
):
) -> np.ndarray[typing.Any, np.dtype[np.int16]]:
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
@@ -122,6 +107,25 @@ class RVC:
rnd = np.random.randn(1, 192, hubert_length).astype(np.float32)
hubert_length = np.array([hubert_length]).astype(np.int64)
out_wav = self.forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze()
out_wav = self.__forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze()
out_wav = np.pad(out_wav, (0, 2 * self.hop_size), "constant")
return out_wav[0:org_length]
def __forward(
self,
hubert: np.ndarray[typing.Any, np.dtype[np.float32]],
hubert_length: int,
pitch: np.ndarray[typing.Any, np.dtype[np.int64]],
pitchf: np.ndarray[typing.Any, np.dtype[np.float32]],
ds: np.ndarray[typing.Any, np.dtype[np.int64]],
rnd: np.ndarray[typing.Any, np.dtype[np.float32]],
) -> np.ndarray[typing.Any, np.dtype[np.int16]]:
onnx_input = {
self.model.get_inputs()[0].name: hubert,
self.model.get_inputs()[1].name: hubert_length,
self.model.get_inputs()[2].name: pitch,
self.model.get_inputs()[3].name: pitchf,
self.model.get_inputs()[4].name: ds,
self.model.get_inputs()[5].name: rnd,
}
return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16)