From e486649a91e8d4ec39a5193acdcd7424bfa6bce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Wed, 12 Jun 2024 20:51:46 +0900 Subject: [PATCH] optimize(rmvpe): move deepunet&e2e into rvc --- infer/lib/audio.py | 11 +- infer/lib/rmvpe.py | 282 +-------------------------------------------- rvc/f0/deepunet.py | 217 ++++++++++++++++++++++++++++++++++ rvc/f0/e2e.py | 66 +++++++++++ 4 files changed, 289 insertions(+), 287 deletions(-) create mode 100644 rvc/f0/deepunet.py create mode 100644 rvc/f0/e2e.py diff --git a/infer/lib/audio.py b/infer/lib/audio.py index 05b2057..3bc0bfb 100644 --- a/infer/lib/audio.py +++ b/infer/lib/audio.py @@ -1,9 +1,10 @@ from io import BufferedWriter, BytesIO from pathlib import Path from typing import Dict, Tuple +import os + import numpy as np import av -import os from av.audio.resampler import AudioResampler video_format_dict: Dict[str, str] = { @@ -44,10 +45,8 @@ def load_audio(file: str, sr: int) -> np.ndarray: resampler = AudioResampler(format="fltp", layout="mono", rate=sr) # Estimated maximum total number of samples to pre-allocate the array - audio_duration_sec: float = ( - container.duration / 1_000_000 - ) # AV stores length in microseconds by default - estimated_total_samples = int(audio_duration_sec * sr + 0.5) + # AV stores length in microseconds by default + estimated_total_samples = int(container.duration * sr // 1_000_000) decoded_audio = np.zeros(estimated_total_samples + 1, dtype=np.float32) offset = 0 @@ -55,7 +54,7 @@ def load_audio(file: str, sr: int) -> np.ndarray: frame.pts = None # Clear presentation timestamp to avoid resampling issues resampled_frames = resampler.resample(frame) for resampled_frame in resampled_frames: - frame_data = np.array(resampled_frame.to_ndarray()).flatten() + frame_data = resampled_frame.to_ndarray()[0] end_index = offset + len(frame_data) # Check if decoded_audio has enough space, and resize if necessary diff --git a/infer/lib/rmvpe.py b/infer/lib/rmvpe.py index 1f924c1..d384d87 100644 --- a/infer/lib/rmvpe.py +++ b/infer/lib/rmvpe.py @@ -18,269 +18,13 @@ except Exception: # pylint: disable=broad-exception-caught pass import torch.nn as nn import torch.nn.functional as F -from librosa.util import normalize, pad_center, tiny -from scipy.signal import get_window import logging logger = logging.getLogger(__name__) from rvc.f0.mel import MelSpectrogram - -from time import time as ttime - - -class BiGRU(nn.Module): - def __init__(self, input_features, hidden_features, num_layers): - super(BiGRU, self).__init__() - self.gru = nn.GRU( - input_features, - hidden_features, - num_layers=num_layers, - batch_first=True, - bidirectional=True, - ) - - def forward(self, x): - return self.gru(x)[0] - - -class ConvBlockRes(nn.Module): - def __init__(self, in_channels, out_channels, momentum=0.01): - super(ConvBlockRes, self).__init__() - self.conv = nn.Sequential( - nn.Conv2d( - in_channels=in_channels, - out_channels=out_channels, - kernel_size=(3, 3), - stride=(1, 1), - padding=(1, 1), - bias=False, - ), - nn.BatchNorm2d(out_channels, momentum=momentum), - nn.ReLU(), - nn.Conv2d( - in_channels=out_channels, - out_channels=out_channels, - kernel_size=(3, 3), - stride=(1, 1), - padding=(1, 1), - bias=False, - ), - nn.BatchNorm2d(out_channels, momentum=momentum), - nn.ReLU(), - ) - # self.shortcut:Optional[nn.Module] = None - if in_channels != out_channels: - self.shortcut = nn.Conv2d(in_channels, out_channels, (1, 1)) - - def forward(self, x: torch.Tensor): - if not hasattr(self, "shortcut"): - return self.conv(x) + x - else: - return self.conv(x) + self.shortcut(x) - - -class Encoder(nn.Module): - def __init__( - self, - in_channels, - in_size, - n_encoders, - kernel_size, - n_blocks, - out_channels=16, - momentum=0.01, - ): - super(Encoder, self).__init__() - self.n_encoders = n_encoders - self.bn = nn.BatchNorm2d(in_channels, momentum=momentum) - self.layers = nn.ModuleList() - self.latent_channels = [] - for i in range(self.n_encoders): - self.layers.append( - ResEncoderBlock( - in_channels, out_channels, kernel_size, n_blocks, momentum=momentum - ) - ) - self.latent_channels.append([out_channels, in_size]) - in_channels = out_channels - out_channels *= 2 - in_size //= 2 - self.out_size = in_size - self.out_channel = out_channels - - def forward(self, x: torch.Tensor): - concat_tensors: List[torch.Tensor] = [] - x = self.bn(x) - for i, layer in enumerate(self.layers): - t, x = layer(x) - concat_tensors.append(t) - return x, concat_tensors - - -class ResEncoderBlock(nn.Module): - def __init__( - self, in_channels, out_channels, kernel_size, n_blocks=1, momentum=0.01 - ): - super(ResEncoderBlock, self).__init__() - self.n_blocks = n_blocks - self.conv = nn.ModuleList() - self.conv.append(ConvBlockRes(in_channels, out_channels, momentum)) - for i in range(n_blocks - 1): - self.conv.append(ConvBlockRes(out_channels, out_channels, momentum)) - self.kernel_size = kernel_size - if self.kernel_size is not None: - self.pool = nn.AvgPool2d(kernel_size=kernel_size) - - def forward(self, x): - for i, conv in enumerate(self.conv): - x = conv(x) - if self.kernel_size is not None: - return x, self.pool(x) - else: - return x - - -class Intermediate(nn.Module): # - def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01): - super(Intermediate, self).__init__() - self.n_inters = n_inters - self.layers = nn.ModuleList() - self.layers.append( - ResEncoderBlock(in_channels, out_channels, None, n_blocks, momentum) - ) - for i in range(self.n_inters - 1): - self.layers.append( - ResEncoderBlock(out_channels, out_channels, None, n_blocks, momentum) - ) - - def forward(self, x): - for i, layer in enumerate(self.layers): - x = layer(x) - return x - - -class ResDecoderBlock(nn.Module): - def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01): - super(ResDecoderBlock, self).__init__() - out_padding = (0, 1) if stride == (1, 2) else (1, 1) - self.n_blocks = n_blocks - self.conv1 = nn.Sequential( - nn.ConvTranspose2d( - in_channels=in_channels, - out_channels=out_channels, - kernel_size=(3, 3), - stride=stride, - padding=(1, 1), - output_padding=out_padding, - bias=False, - ), - nn.BatchNorm2d(out_channels, momentum=momentum), - nn.ReLU(), - ) - self.conv2 = nn.ModuleList() - self.conv2.append(ConvBlockRes(out_channels * 2, out_channels, momentum)) - for i in range(n_blocks - 1): - self.conv2.append(ConvBlockRes(out_channels, out_channels, momentum)) - - def forward(self, x, concat_tensor): - x = self.conv1(x) - x = torch.cat((x, concat_tensor), dim=1) - for i, conv2 in enumerate(self.conv2): - x = conv2(x) - return x - - -class Decoder(nn.Module): - def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01): - super(Decoder, self).__init__() - self.layers = nn.ModuleList() - self.n_decoders = n_decoders - for i in range(self.n_decoders): - out_channels = in_channels // 2 - self.layers.append( - ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum) - ) - in_channels = out_channels - - def forward(self, x: torch.Tensor, concat_tensors: List[torch.Tensor]): - for i, layer in enumerate(self.layers): - x = layer(x, concat_tensors[-1 - i]) - return x - - -class DeepUnet(nn.Module): - def __init__( - self, - kernel_size, - n_blocks, - en_de_layers=5, - inter_layers=4, - in_channels=1, - en_out_channels=16, - ): - super(DeepUnet, self).__init__() - self.encoder = Encoder( - in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels - ) - self.intermediate = Intermediate( - self.encoder.out_channel // 2, - self.encoder.out_channel, - inter_layers, - n_blocks, - ) - self.decoder = Decoder( - self.encoder.out_channel, en_de_layers, kernel_size, n_blocks - ) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - x, concat_tensors = self.encoder(x) - x = self.intermediate(x) - x = self.decoder(x, concat_tensors) - return x - - -class E2E(nn.Module): - def __init__( - self, - n_blocks, - n_gru, - kernel_size, - en_de_layers=5, - inter_layers=4, - in_channels=1, - en_out_channels=16, - ): - super(E2E, self).__init__() - self.unet = DeepUnet( - kernel_size, - n_blocks, - en_de_layers, - inter_layers, - in_channels, - en_out_channels, - ) - self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1)) - if n_gru: - self.fc = nn.Sequential( - BiGRU(3 * 128, 256, n_gru), - nn.Linear(512, 360), - nn.Dropout(0.25), - nn.Sigmoid(), - ) - else: - self.fc = nn.Sequential( - nn.Linear(3 * nn.N_MELS, nn.N_CLASS), nn.Dropout(0.25), nn.Sigmoid() - ) - - def forward(self, mel): - # print(mel.shape) - mel = mel.transpose(-1, -2).unsqueeze(1) - x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2) - x = self.fc(x) - # print(x.shape) - return x +from rvc.f0.e2e import E2E class RMVPE: @@ -442,27 +186,3 @@ class RMVPE: # t4 = ttime() # print("decode:%s\t%s\t%s\t%s" % (t1 - t0, t2 - t1, t3 - t2, t4 - t3)) return devided - - -if __name__ == "__main__": - import librosa - import soundfile as sf - - audio, sampling_rate = sf.read(r"C:\Users\liujing04\Desktop\Z\冬之花clip1.wav") - if len(audio.shape) > 1: - audio = librosa.to_mono(audio.transpose(1, 0)) - audio_bak = audio.copy() - if sampling_rate != 16000: - audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000) - model_path = r"D:\BaiduNetdiskDownload\RVC-beta-v2-0727AMD_realtime\rmvpe.pt" - thred = 0.03 # 0.01 - device = "cuda" if torch.cuda.is_available() else "cpu" - rmvpe = RMVPE(model_path, is_half=False, device=device) - t0 = ttime() - f0 = rmvpe.infer_from_audio(audio, thred=thred) - # f0 = rmvpe.infer_from_audio(audio, thred=thred) - # f0 = rmvpe.infer_from_audio(audio, thred=thred) - # f0 = rmvpe.infer_from_audio(audio, thred=thred) - # f0 = rmvpe.infer_from_audio(audio, thred=thred) - t1 = ttime() - logger.info("%s %.2f", f0.shape, t1 - t0) diff --git a/rvc/f0/deepunet.py b/rvc/f0/deepunet.py new file mode 100644 index 0000000..1a75178 --- /dev/null +++ b/rvc/f0/deepunet.py @@ -0,0 +1,217 @@ +from typing import List, Tuple, Union + +import torch +import torch.nn as nn + + +class ConvBlockRes(nn.Module): + def __init__( + self, + in_channels: int, + out_channels: int, + momentum: float = 0.01, + ): + super(ConvBlockRes, self).__init__() + self.conv = nn.Sequential( + nn.Conv2d( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=(3, 3), + stride=(1, 1), + padding=(1, 1), + bias=False, + ), + nn.BatchNorm2d(out_channels, momentum=momentum), + nn.ReLU(), + nn.Conv2d( + in_channels=out_channels, + out_channels=out_channels, + kernel_size=(3, 3), + stride=(1, 1), + padding=(1, 1), + bias=False, + ), + nn.BatchNorm2d(out_channels, momentum=momentum), + nn.ReLU(), + ) + # self.shortcut:Optional[nn.Module] = None + if in_channels != out_channels: + self.shortcut = nn.Conv2d(in_channels, out_channels, (1, 1)) + + def forward(self, x: torch.Tensor): + if not hasattr(self, "shortcut"): + return self.conv(x) + x + else: + return self.conv(x) + self.shortcut(x) + + +class Encoder(nn.Module): + def __init__( + self, + in_channels: int, + in_size: int, + n_encoders: int, + kernel_size: Tuple[int, int], + n_blocks: int, + out_channels=16, + momentum=0.01, + ): + super(Encoder, self).__init__() + self.n_encoders = n_encoders + + self.bn = nn.BatchNorm2d(in_channels, momentum=momentum) + self.layers = nn.ModuleList() + for _ in range(self.n_encoders): + self.layers.append( + ResEncoderBlock( + in_channels, out_channels, kernel_size, n_blocks, momentum=momentum + ) + ) + in_channels = out_channels + out_channels *= 2 + in_size //= 2 + self.out_size = in_size + self.out_channel = out_channels + + def __call__(self, x: torch.Tensor) -> Tuple[torch.Tensor, List[torch.Tensor]]: + return super().__call__(x) + + def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, List[torch.Tensor]]: + concat_tensors: List[torch.Tensor] = [] + x = self.bn(x) + for layer in self.layers: + t, x = layer(x) + concat_tensors.append(t) + return x, concat_tensors + + +class ResEncoderBlock(nn.Module): + def __init__( + self, + in_channels: int, + out_channels: int, + kernel_size: Tuple[int, int], + n_blocks=1, + momentum=0.01, + ): + super(ResEncoderBlock, self).__init__() + self.n_blocks = n_blocks + self.kernel_size = kernel_size + + self.conv = nn.ModuleList() + self.conv.append(ConvBlockRes(in_channels, out_channels, momentum)) + for _ in range(n_blocks - 1): + self.conv.append(ConvBlockRes(out_channels, out_channels, momentum)) + + if self.kernel_size is not None: + self.pool = nn.AvgPool2d(kernel_size=kernel_size) + + def forward( + self, + x: torch.Tensor, + ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: + for conv in self.conv: + x = conv(x) + if self.kernel_size is not None: + return x, self.pool(x) + return x + + +class Intermediate(nn.Module): + def __init__(self, in_channels, out_channels, n_inters, n_blocks, momentum=0.01): + super(Intermediate, self).__init__() + + self.layers = nn.ModuleList() + self.layers.append( + ResEncoderBlock(in_channels, out_channels, None, n_blocks, momentum) + ) + for _ in range(n_inters - 1): + self.layers.append( + ResEncoderBlock(out_channels, out_channels, None, n_blocks, momentum) + ) + + def forward(self, x): + for layer in self.layers: + x = layer(x) + return x + + +class ResDecoderBlock(nn.Module): + def __init__(self, in_channels, out_channels, stride, n_blocks=1, momentum=0.01): + super(ResDecoderBlock, self).__init__() + out_padding = (0, 1) if stride == (1, 2) else (1, 1) + + self.conv1 = nn.Sequential( + nn.ConvTranspose2d( + in_channels=in_channels, + out_channels=out_channels, + kernel_size=(3, 3), + stride=stride, + padding=(1, 1), + output_padding=out_padding, + bias=False, + ), + nn.BatchNorm2d(out_channels, momentum=momentum), + nn.ReLU(), + ) + self.conv2 = nn.ModuleList() + self.conv2.append(ConvBlockRes(out_channels * 2, out_channels, momentum)) + for _ in range(n_blocks - 1): + self.conv2.append(ConvBlockRes(out_channels, out_channels, momentum)) + + def forward(self, x, concat_tensor): + x = self.conv1(x) + x = torch.cat((x, concat_tensor), dim=1) + for conv2 in self.conv2: + x = conv2(x) + return x + + +class Decoder(nn.Module): + def __init__(self, in_channels, n_decoders, stride, n_blocks, momentum=0.01): + super(Decoder, self).__init__() + + self.layers = nn.ModuleList() + self.n_decoders = n_decoders + for _ in range(self.n_decoders): + out_channels = in_channels // 2 + self.layers.append( + ResDecoderBlock(in_channels, out_channels, stride, n_blocks, momentum) + ) + in_channels = out_channels + + def forward(self, x: torch.Tensor, concat_tensors: List[torch.Tensor]): + for i, layer in enumerate(self.layers): + x = layer(x, concat_tensors[-1 - i]) + return x + + +class DeepUnet(nn.Module): + def __init__( + self, + kernel_size: Tuple[int, int], + n_blocks: int, + en_de_layers=5, + inter_layers=4, + in_channels=1, + en_out_channels=16, + ): + super(DeepUnet, self).__init__() + self.encoder = Encoder( + in_channels, 128, en_de_layers, kernel_size, n_blocks, en_out_channels + ) + self.intermediate = Intermediate( + self.encoder.out_channel // 2, + self.encoder.out_channel, + inter_layers, + n_blocks, + ) + self.decoder = Decoder( + self.encoder.out_channel, en_de_layers, kernel_size, n_blocks + ) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + x, concat_tensors = self.encoder(x) + x = self.intermediate(x) + x = self.decoder(x, concat_tensors) + return x diff --git a/rvc/f0/e2e.py b/rvc/f0/e2e.py new file mode 100644 index 0000000..fbf65fd --- /dev/null +++ b/rvc/f0/e2e.py @@ -0,0 +1,66 @@ +from typing import Tuple + +import torch.nn as nn + +from .deepunet import DeepUnet + +class E2E(nn.Module): + def __init__( + self, + n_blocks: int, + n_gru: int, + kernel_size: Tuple[int, int], + en_de_layers=5, + inter_layers=4, + in_channels=1, + en_out_channels=16, + ): + super(E2E, self).__init__() + + self.unet = DeepUnet( + kernel_size, + n_blocks, + en_de_layers, + inter_layers, + in_channels, + en_out_channels, + ) + self.cnn = nn.Conv2d(en_out_channels, 3, (3, 3), padding=(1, 1)) + if n_gru: + self.fc = nn.Sequential( + self.BiGRU(3 * 128, 256, n_gru), + nn.Linear(512, 360), + nn.Dropout(0.25), + nn.Sigmoid(), + ) + else: + self.fc = nn.Sequential( + nn.Linear(3 * nn.N_MELS, nn.N_CLASS), + nn.Dropout(0.25), + nn.Sigmoid(), + ) + + def forward(self, mel): + mel = mel.transpose(-1, -2).unsqueeze(1) + x = self.cnn(self.unet(mel)).transpose(1, 2).flatten(-2) + x = self.fc(x) + return x + + class BiGRU(nn.Module): + def __init__( + self, + input_features: int, + hidden_features: int, + num_layers: int, + ): + super().__init__() + self.gru = nn.GRU( + input_features, + hidden_features, + num_layers=num_layers, + batch_first=True, + bidirectional=True, + ) + + def forward(self, x): + return self.gru(x)[0]