diff --git a/infer/lib/infer_pack/models.py b/infer/lib/infer_pack/models.py index b7d7b9b..f85e356 100644 --- a/infer/lib/infer_pack/models.py +++ b/infer/lib/infer_pack/models.py @@ -9,6 +9,7 @@ from torch.nn.utils import remove_weight_norm, spectral_norm, weight_norm from rvc import residuals from rvc.norms import WN +from rvc.residuals import ResidualCouplingBlock from rvc.utils import ( get_padding, call_weight_data_normal_if_Conv, @@ -21,92 +22,6 @@ from rvc.encoders import TextEncoder has_xpu = bool(hasattr(torch, "xpu") and torch.xpu.is_available()) -class ResidualCouplingBlock(nn.Module): - class Flip(nn.Module): - """ - torch.jit.script() Compiled functions - can't take variable number of arguments or - use keyword-only arguments with defaults - """ - def forward( - self, - x: torch.Tensor, - x_mask: torch.Tensor, - g: Optional[torch.Tensor] = None, - reverse: bool = False, - ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: - x = torch.flip(x, [1]) - if not reverse: - logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) - return x, logdet - else: - return x, torch.zeros([1], device=x.device) - - def __init__( - self, - channels, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - n_flows=4, - gin_channels=0, - ): - super(ResidualCouplingBlock, self).__init__() - self.channels = channels - self.hidden_channels = hidden_channels - self.kernel_size = kernel_size - self.dilation_rate = dilation_rate - self.n_layers = n_layers - self.n_flows = n_flows - self.gin_channels = gin_channels - - self.flows = nn.ModuleList() - for i in range(n_flows): - self.flows.append( - residuals.ResidualCouplingLayer( - channels, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - gin_channels=gin_channels, - mean_only=True, - ) - ) - self.flows.append(self.Flip()) - - def forward( - self, - x: torch.Tensor, - x_mask: torch.Tensor, - g: Optional[torch.Tensor] = None, - reverse: bool = False, - ): - if not reverse: - for flow in self.flows: - x, _ = flow(x, x_mask, g=g, reverse=reverse) - else: - for flow in reversed(self.flows): - x, _ = flow.forward(x, x_mask, g=g, reverse=reverse) - return x - - def remove_weight_norm(self): - for i in range(self.n_flows): - self.flows[i * 2].remove_weight_norm() - - def __prepare_scriptable__(self): - for i in range(self.n_flows): - for hook in self.flows[i * 2]._forward_pre_hooks.values(): - if ( - hook.__module__ == "torch.nn.utils.weight_norm" - and hook.__class__.__name__ == "WeightNorm" - ): - torch.nn.utils.remove_weight_norm(self.flows[i * 2]) - - return self - - class PosteriorEncoder(nn.Module): def __init__( self, @@ -425,15 +340,15 @@ class SourceModuleHnNSF(torch.nn.Module): class GeneratorNSF(torch.nn.Module): def __init__( self, - initial_channel, - resblock, - resblock_kernel_sizes, - resblock_dilation_sizes, - upsample_rates, - upsample_initial_channel, - upsample_kernel_sizes, - gin_channels, - sr, + initial_channel: int, + resblock: str, + resblock_kernel_sizes: List[int], + resblock_dilation_sizes: List[List[int]], + upsample_rates: List[int], + upsample_initial_channel: int, + upsample_kernel_sizes: List[int], + gin_channels: int, + sr: int, ): super(GeneratorNSF, self).__init__() self.num_kernels = len(resblock_kernel_sizes) @@ -479,7 +394,7 @@ class GeneratorNSF(torch.nn.Module): self.resblocks = nn.ModuleList() for i in range(len(self.ups)): - ch = upsample_initial_channel // (2 ** (i + 1)) + ch: int = upsample_initial_channel // (2 ** (i + 1)) for j, (k, d) in enumerate( zip(resblock_kernel_sizes, resblock_dilation_sizes) ): @@ -817,7 +732,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module): p_dropout, resblock: str, resblock_kernel_sizes, - resblock_dilation_sizes, + resblock_dilation_sizes: List[List[int]], upsample_rates, upsample_initial_channel, upsample_kernel_sizes, diff --git a/infer/lib/infer_pack/models_onnx.py b/infer/lib/infer_pack/models_onnx.py index 34b25de..9fbcc53 100644 --- a/infer/lib/infer_pack/models_onnx.py +++ b/infer/lib/infer_pack/models_onnx.py @@ -2,12 +2,12 @@ import torch from torch import nn from .models import ( - ResidualCouplingBlock, PosteriorEncoder, GeneratorNSF, ) from rvc.encoders import TextEncoder +from rvc.residuals import ResidualCouplingBlock class SynthesizerTrnMsNSFsidM(nn.Module): diff --git a/rvc/attentions.py b/rvc/attentions.py index b6db6b6..fc79828 100644 --- a/rvc/attentions.py +++ b/rvc/attentions.py @@ -13,9 +13,9 @@ class MultiHeadAttention(nn.Module): out_channels: int, n_heads: int, p_dropout: float = 0.0, - window_size: int | None = None, + window_size: Optional[int] = None, heads_share: bool = True, - block_length: int | None = None, + block_length: Optional[int] = None, proximal_bias: bool = False, proximal_init: bool = False, ): @@ -233,7 +233,7 @@ class FFN(nn.Module): filter_channels: int, kernel_size: int, p_dropout: float = 0.0, - activation: str | None = None, + activation: Optional[str] = None, causal: bool = False, ): super(FFN, self).__init__() diff --git a/rvc/onnx/f0predictors/dio.py b/rvc/onnx/f0predictors/dio.py index 438426f..29b1f0f 100644 --- a/rvc/onnx/f0predictors/dio.py +++ b/rvc/onnx/f0predictors/dio.py @@ -1,6 +1,7 @@ +from typing import Any, Optional + import numpy as np import pyworld -import typing from .f0 import F0Predictor @@ -10,7 +11,7 @@ class DioF0Predictor(F0Predictor): super().__init__(hop_length, f0_min, f0_max, sampling_rate) def compute_f0( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): if p_len is None: p_len = wav.shape[0] // self.hop_length @@ -27,7 +28,7 @@ class DioF0Predictor(F0Predictor): return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0] def compute_f0_uv( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): if p_len is None: p_len = wav.shape[0] // self.hop_length diff --git a/rvc/onnx/f0predictors/f0.py b/rvc/onnx/f0predictors/f0.py index 8c96337..ae20a79 100644 --- a/rvc/onnx/f0predictors/f0.py +++ b/rvc/onnx/f0predictors/f0.py @@ -1,5 +1,6 @@ +from typing import Any, Optional + import numpy as np -import typing class F0Predictor(object): @@ -10,14 +11,14 @@ class F0Predictor(object): self.sampling_rate = sampling_rate def compute_f0( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): ... def compute_f0_uv( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): ... - def __interpolate_f0(self, f0: np.ndarray[typing.Any, np.dtype]): + def __interpolate_f0(self, f0: np.ndarray[Any, np.dtype]): """ 对F0进行插值处理 """ @@ -55,7 +56,7 @@ class F0Predictor(object): return ip_data[:, 0], vuv_vector[:, 0] - def __resize_f0(self, x: np.ndarray[typing.Any, np.dtype], target_len: int): + def __resize_f0(self, x: np.ndarray[Any, np.dtype], target_len: int): source = np.array(x) source[source < 0.001] = np.nan target = np.interp( diff --git a/rvc/onnx/f0predictors/harvest.py b/rvc/onnx/f0predictors/harvest.py index 3d51ec9..bec8a5d 100644 --- a/rvc/onnx/f0predictors/harvest.py +++ b/rvc/onnx/f0predictors/harvest.py @@ -1,6 +1,7 @@ +from typing import Any, Optional + import numpy as np import pyworld -import typing from .f0 import F0Predictor @@ -10,7 +11,7 @@ class HarvestF0Predictor(F0Predictor): super().__init__(hop_length, f0_min, f0_max, sampling_rate) def compute_f0( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): if p_len is None: p_len = wav.shape[0] // self.hop_length @@ -25,7 +26,7 @@ class HarvestF0Predictor(F0Predictor): return self.__interpolate_f0(self.__resize_f0(f0, p_len))[0] def compute_f0_uv( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): if p_len is None: p_len = wav.shape[0] // self.hop_length diff --git a/rvc/onnx/f0predictors/pm.py b/rvc/onnx/f0predictors/pm.py index 7101b91..ca2790d 100644 --- a/rvc/onnx/f0predictors/pm.py +++ b/rvc/onnx/f0predictors/pm.py @@ -1,6 +1,7 @@ +from typing import Any, Optional + import numpy as np import parselmouth -import typing from .f0 import F0Predictor @@ -10,7 +11,7 @@ class PMF0Predictor(F0Predictor): super().__init__(hop_length, f0_min, f0_max, sampling_rate) def compute_f0( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): x = wav if p_len is None: @@ -36,7 +37,7 @@ class PMF0Predictor(F0Predictor): return f0 def compute_f0_uv( - self, wav: np.ndarray[typing.Any, np.dtype], p_len: int | None = None + self, wav: np.ndarray[Any, np.dtype], p_len: Optional[int] = None ): x = wav if p_len is None: diff --git a/rvc/residuals.py b/rvc/residuals.py index 09d6d02..c0f2752 100644 --- a/rvc/residuals.py +++ b/rvc/residuals.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, List, Tuple import torch from torch import nn @@ -15,46 +15,33 @@ from .utils import ( LRELU_SLOPE = 0.1 class ResBlock1(torch.nn.Module): - def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): + def __init__( + self, + channels: int, + kernel_size: int = 3, + dilation: List[int] = (1, 3, 5), + ): super(ResBlock1, self).__init__() - self.convs1 = nn.ModuleList( - [ + + self.convs1 = nn.ModuleList() + for d in dilation: + self.convs1.append( weight_norm( Conv1d( channels, channels, kernel_size, 1, - dilation=dilation[0], - padding=get_padding(kernel_size, dilation[0]), + dilation=d, + padding=get_padding(kernel_size, d), ) ), - weight_norm( - Conv1d( - channels, - channels, - kernel_size, - 1, - dilation=dilation[1], - padding=get_padding(kernel_size, dilation[1]), - ) - ), - weight_norm( - Conv1d( - channels, - channels, - kernel_size, - 1, - dilation=dilation[2], - padding=get_padding(kernel_size, dilation[2]), - ) - ), - ] - ) + ) self.convs1.apply(call_weight_data_normal_if_Conv) - self.convs2 = nn.ModuleList( - [ + self.convs2 = nn.ModuleList() + for _ in dilation: + self.convs1.append( weight_norm( Conv1d( channels, @@ -65,32 +52,22 @@ class ResBlock1(torch.nn.Module): padding=get_padding(kernel_size, 1), ) ), - weight_norm( - Conv1d( - channels, - channels, - kernel_size, - 1, - dilation=1, - padding=get_padding(kernel_size, 1), - ) - ), - weight_norm( - Conv1d( - channels, - channels, - kernel_size, - 1, - dilation=1, - padding=get_padding(kernel_size, 1), - ) - ), - ] - ) + ) self.convs2.apply(call_weight_data_normal_if_Conv) self.lrelu_slope = LRELU_SLOPE - def forward(self, x: torch.Tensor, x_mask: Optional[torch.Tensor] = None): + def __call__( + self, + x: torch.Tensor, + x_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + return super().__call__(x, x_mask=x_mask) + + def forward( + self, + x: torch.Tensor, + x_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: for c1, c2 in zip(self.convs1, self.convs2): xt = F.leaky_relu(x, self.lrelu_slope) if x_mask is not None: @@ -130,36 +107,46 @@ class ResBlock1(torch.nn.Module): class ResBlock2(torch.nn.Module): - def __init__(self, channels, kernel_size=3, dilation=(1, 3)): + """ + Actually this module is not used currently + because all configs specified "resblock": "1" + """ + def __init__( + self, + channels: int, + kernel_size=3, + dilation: List[int] = (1, 3), + ): super(ResBlock2, self).__init__() - self.convs = nn.ModuleList( - [ + self.convs = nn.ModuleList() + for d in dilation: + self.convs.append( weight_norm( Conv1d( channels, channels, kernel_size, 1, - dilation=dilation[0], - padding=get_padding(kernel_size, dilation[0]), + dilation=d, + padding=get_padding(kernel_size, d), ) ), - weight_norm( - Conv1d( - channels, - channels, - kernel_size, - 1, - dilation=dilation[1], - padding=get_padding(kernel_size, dilation[1]), - ) - ), - ] - ) + ) self.convs.apply(call_weight_data_normal_if_Conv) self.lrelu_slope = LRELU_SLOPE - def forward(self, x, x_mask: Optional[torch.Tensor] = None): + def __call__( + self, + x: torch.Tensor, + x_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + return super().__call__(x, x_mask=x_mask) + + def forward( + self, + x: torch.Tensor, + x_mask: Optional[torch.Tensor] = None, + ) -> torch.Tensor: for c in self.convs: xt = F.leaky_relu(x, self.lrelu_slope) if x_mask is not None: @@ -188,14 +175,14 @@ class ResBlock2(torch.nn.Module): class ResidualCouplingLayer(nn.Module): def __init__( self, - channels, - hidden_channels, - kernel_size, - dilation_rate, - n_layers, - p_dropout=0, - gin_channels=0, - mean_only=False, + channels: int, + hidden_channels: int, + kernel_size: int, + dilation_rate: int, + n_layers: int, + p_dropout: int = 0, + gin_channels: int = 0, + mean_only: bool = False, ): assert channels % 2 == 0, "channels should be divisible by 2" super(ResidualCouplingLayer, self).__init__() @@ -220,13 +207,22 @@ class ResidualCouplingLayer(nn.Module): self.post.weight.data.zero_() self.post.bias.data.zero_() + def __call__( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> Tuple[torch.Tensor, torch.Tensor]: + return super().__call__(x, x_mask, g=g, reverse=reverse) + def forward( self, x: torch.Tensor, x_mask: torch.Tensor, g: Optional[torch.Tensor] = None, reverse: bool = False, - ): + ) -> Tuple[torch.Tensor, torch.Tensor]: x0, x1 = torch.split(x, [self.half_channels] * 2, 1) h = self.pre(x0) * x_mask h = self.enc(h, x_mask, g=g) @@ -242,10 +238,10 @@ class ResidualCouplingLayer(nn.Module): x = torch.cat([x0, x1], 1) logdet = torch.sum(logs, [1, 2]) return x, logdet - else: - x1 = (x1 - m) * torch.exp(-logs) * x_mask - x = torch.cat([x0, x1], 1) - return x, torch.zeros([1]) + + x1 = (x1 - m) * torch.exp(-logs) * x_mask + x = torch.cat([x0, x1], 1) + return x, torch.zeros([1]) def remove_weight_norm(self): self.enc.remove_weight_norm() @@ -258,3 +254,96 @@ class ResidualCouplingLayer(nn.Module): ): torch.nn.utils.remove_weight_norm(self.enc) return self + +class ResidualCouplingBlock(nn.Module): + class Flip(nn.Module): + """ + torch.jit.script() Compiled functions + can't take variable number of arguments or + use keyword-only arguments with defaults + """ + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: + x = torch.flip(x, [1]) + if not reverse: + logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device) + return x, logdet + else: + return x, torch.zeros([1], device=x.device) + + def __init__( + self, + channels: int, + hidden_channels: int, + kernel_size: int, + dilation_rate: int, + n_layers: int, + n_flows: int = 4, + gin_channels: int = 0, + ): + super(ResidualCouplingBlock, self).__init__() + self.channels = channels + self.hidden_channels = hidden_channels + self.kernel_size = kernel_size + self.dilation_rate = dilation_rate + self.n_layers = n_layers + self.n_flows = n_flows + self.gin_channels = gin_channels + + self.flows = nn.ModuleList() + for _ in range(n_flows): + self.flows.append( + ResidualCouplingLayer( + channels, + hidden_channels, + kernel_size, + dilation_rate, + n_layers, + gin_channels=gin_channels, + mean_only=True, + ) + ) + self.flows.append(self.Flip()) + + def __call__( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> torch.Tensor: + return super().__call__(x, x_mask, g=g, reverse=reverse) + + def forward( + self, + x: torch.Tensor, + x_mask: torch.Tensor, + g: Optional[torch.Tensor] = None, + reverse: bool = False, + ) -> torch.Tensor: + if not reverse: + for flow in self.flows: + x, _ = flow(x, x_mask, g=g, reverse=reverse) + else: + for flow in reversed(self.flows): + x, _ = flow.forward(x, x_mask, g=g, reverse=reverse) + return x + + def remove_weight_norm(self): + for i in range(self.n_flows): + self.flows[i * 2].remove_weight_norm() + + def __prepare_scriptable__(self): + for i in range(self.n_flows): + for hook in self.flows[i * 2]._forward_pre_hooks.values(): + if ( + hook.__module__ == "torch.nn.utils.weight_norm" + and hook.__class__.__name__ == "WeightNorm" + ): + torch.nn.utils.remove_weight_norm(self.flows[i * 2]) + return self diff --git a/rvc/transforms.py b/rvc/transforms.py index 679882f..2cb2a36 100644 --- a/rvc/transforms.py +++ b/rvc/transforms.py @@ -1,3 +1,5 @@ +from typing import Optional + import numpy as np import torch from torch.nn import functional as F @@ -13,7 +15,7 @@ def piecewise_rational_quadratic_transform( unnormalized_heights: torch.Tensor, unnormalized_derivatives: torch.Tensor, inverse: bool = False, - tails: str | None = None, + tails: Optional[str] = None, tail_bound: float = 1.0, min_bin_width=DEFAULT_MIN_BIN_WIDTH, min_bin_height=DEFAULT_MIN_BIN_HEIGHT,