1
0
mirror of https://github.com/fumiama/Retrieval-based-Voice-Conversion-WebUI.git synced 2026-06-05 01:10:22 +08:00

chore(format): run black on dev (#94)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
github-actions[bot]
2024-11-28 03:21:10 +09:00
committed by GitHub
parent a8783c6639
commit d3add81469
10 changed files with 126 additions and 47 deletions

View File

@@ -28,6 +28,7 @@ def float_to_int16(audio: np.ndarray) -> np.ndarray:
am = 32767 * 32768 // am
return np.multiply(audio, am).astype(np.int16)
def float_np_array_to_wav_buf(wav: np.ndarray, sr: int, f32=False) -> BytesIO:
buf = BytesIO()
if f32:
@@ -41,10 +42,12 @@ def float_np_array_to_wav_buf(wav: np.ndarray, sr: int, f32=False) -> BytesIO:
buf.seek(0, 0)
return buf
def save_audio(path: str, audio: np.ndarray, sr: int, f32=False):
with open(path, "wb") as f:
f.write(float_np_array_to_wav_buf(audio, sr, f32).getbuffer())
def wav2(i: BytesIO, o: BufferedWriter, format: str):
inp = av.open(i, "r")
format = video_format_dict.get(format, format)
@@ -65,12 +68,14 @@ def wav2(i: BytesIO, o: BufferedWriter, format: str):
def load_audio(
file: Union[str, BytesIO, Path],
sr: Optional[int]=None,
format: Optional[str]=None,
mono=True
) -> Union[np.ndarray, Tuple[np.ndarray, int]]:
if (isinstance(file, str) and not Path(file).exists()) or (isinstance(file, Path) and not file.exists()):
file: Union[str, BytesIO, Path],
sr: Optional[int] = None,
format: Optional[str] = None,
mono=True,
) -> Union[np.ndarray, Tuple[np.ndarray, int]]:
if (isinstance(file, str) and not Path(file).exists()) or (
isinstance(file, Path) and not file.exists()
):
raise FileNotFoundError(f"File not found: {file}")
rate = 0
@@ -78,12 +83,25 @@ def load_audio(
audio_stream = next(s for s in container.streams if s.type == "audio")
channels = 1 if audio_stream.layout == "mono" else 2
container.seek(0)
resampler = AudioResampler(format="fltp", layout=audio_stream.layout, rate=sr) if sr is not None else None
resampler = (
AudioResampler(format="fltp", layout=audio_stream.layout, rate=sr)
if sr is not None
else None
)
# Estimated maximum total number of samples to pre-allocate the array
# AV stores length in microseconds by default
estimated_total_samples = int(container.duration * sr // 1_000_000) if sr is not None else 48000
decoded_audio = np.zeros(estimated_total_samples + 1 if channels == 1 else (channels, estimated_total_samples + 1), dtype=np.float32)
estimated_total_samples = (
int(container.duration * sr // 1_000_000) if sr is not None else 48000
)
decoded_audio = np.zeros(
(
estimated_total_samples + 1
if channels == 1
else (channels, estimated_total_samples + 1)
),
dtype=np.float32,
)
offset = 0
@@ -92,7 +110,9 @@ def load_audio(
rate = 0
for frame in packet:
frame.pts = None # 清除时间戳,避免重新采样问题
resampled_frames = resampler.resample(frame) if resampler is not None else [frame]
resampled_frames = (
resampler.resample(frame) if resampler is not None else [frame]
)
for resampled_frame in resampled_frames:
frame_data = resampled_frame.to_ndarray()
rate = resampled_frame.rate
@@ -104,13 +124,16 @@ def load_audio(
yield p.decode()
for r, frames_data in map(process_packet, frame_iter(container)):
if not rate: rate = r
if not rate:
rate = r
for frame_data in frames_data:
end_index = offset + len(frame_data[0])
# 检查 decoded_audio 是否有足够的空间,并在必要时调整大小
if end_index > decoded_audio.shape[1]:
decoded_audio = np.resize(decoded_audio, (decoded_audio.shape[0], end_index*4))
decoded_audio = np.resize(
decoded_audio, (decoded_audio.shape[0], end_index * 4)
)
np.copyto(decoded_audio[..., offset:end_index], frame_data)
offset += len(frame_data[0])
@@ -126,7 +149,9 @@ def load_audio(
return decoded_audio, rate
def downsample_audio(input_path: str, output_path: str, format: str, br=128_000) -> None:
def downsample_audio(
input_path: str, output_path: str, format: str, br=128_000
) -> None:
"""
default to 128kb/s (equivalent to -q:a 2)
"""

View File

@@ -244,11 +244,15 @@ def main():
for i, chunk in enumerate(chunks):
if len(chunk.shape) > 1:
chunk = chunk.T
save_audio(os.path.join(
out,
f"%s_%d.wav"
% (os.path.basename(args.audio).rsplit(".", maxsplit=1)[0], i),
), chunk, sr)
save_audio(
os.path.join(
out,
f"%s_%d.wav"
% (os.path.basename(args.audio).rsplit(".", maxsplit=1)[0], i),
),
chunk,
sr,
)
if __name__ == "__main__":