由于 WAV 文件通常包含未压缩的数据,因此它们的体积可能很大。这可能会使它们的处理速度非常慢,甚至阻止您一次将整个文件放入内存中。
您可以使用滑动窗口技术在播放时可视化音频的一小部分,而不是绘制整个或部分 WAV 文件的静态波形。这将通过实时更新绘图来产生有趣的示波器效果:
在这里插入图片描述
plot_oscilloscope.py
from argparse import ArgumentParser
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from waveio import WAVReader
def slide_window(window_seconds, wav):
num_windows = round(wav.metadata.num_seconds / window_seconds)
for i in range(num_windows):
begin_seconds = i * window_seconds
end_seconds = begin_seconds + window_seconds
channels = wav.channels_sliced(begin_seconds, end_seconds)
yield np.mean(tuple(channels), axis=0)
def animate(filename, seconds, windows):
try:
plt.style.use("dark_background")
except OSError:
pass # Fall back to the default style
fig, ax = plt.subplots(figsize=(16, 9))
fig.canvas.manager.set_window_title(filename)
plt.tight_layout()
plt.box(False)
for window in windows:
plt.cla()
ax.set_xticks([])
ax.set_yticks([])
ax.set_ylim(-1.0, 1.0)
plt.plot(window)
plt.pause(seconds)
def main():
args = parse_args()
with WAVReader(args.path) as wav:
animate(
args.path.name,
args.seconds,
slide_window(args.seconds, wav),
)
def parse_args():
parser = ArgumentParser(description="Animate WAV file waveform")
parser.add_argument("path", type=Path, help="path to the WAV file")
parser.add_argument(
"-s",
"--seconds",
type=float,
default=0.05,
help="sliding window size in seconds",
)
return parser.parse_args()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Aborted")
现在,从 plot_oscilloscope.py
中复制整个源代码并将其粘贴到名为 plot_spectrogram.py
的新脚本中,您将修改该脚本以创建 WAV 文件的新可视化效果。
在这里插入图片描述
由于您将计算短音频段的 FFT,因此您需要重叠相邻的音频段,以最大程度地减少边缘突然不连续性导致的频谱泄漏。
# ...
def parse_args():
parser = ArgumentParser(description="Animate WAV file spectrogram")
parser.add_argument("path", type=Path, help="path to the WAV file")
parser.add_argument(
"-s",
"--seconds",
type=float,
default=0.0015,
help="sliding window size in seconds",
)
parser.add_argument(
"-o",
"--overlap",
choices=range(100),
default=50,
type=int,
help="sliding window overlap as a percentage",
)
return parser.parse_args()
# ...
--overlap
参数的值必须是介于 0 和 100 不含之间的整数,表示百分比。重叠越大,动画看起来越平滑。
现在,您可以修改 slide_window()
函数以接受该重叠百分比作为附加参数:
# ...
def slide_window(window_seconds, overlap_percentage, wav):
step_seconds = window_seconds * (1 - overlap_percentage / 100)
num_windows = round(wav.metadata.num_seconds / step_seconds)
for i in range(num_windows):
begin_seconds = i * step_seconds
end_seconds = begin_seconds + window_seconds
channels = wav.channels_sliced(begin_seconds, end_seconds)
yield np.mean(tuple(channels), axis=0)
# ...
您没有像以前那样按窗口的整个持续时间移动窗口,而是引入了一个更小的步骤,从而导致总共有更多的窗口。另一方面,当重叠百分比为零时,您将窗口彼此相邻排列,它们之间没有任何重叠。
您现在可以将命令行中请求的overlap传递给生成器函数以及 animate()
函数:
# ...
def main():
args = parse_args()
with WAVReader(args.path) as wav:
animate(
args.path.name,
args.seconds,
args.overlap,
fft(slide_window(args.seconds, args.overlap, wav), wav),
)
# ...
使用numpy的fft计算每个窗口的频率:
# ...
def fft(windows, wav):
sampling_period = 1 / wav.metadata.frames_per_second
for window in windows:
frequencies = np.fft.rfftfreq(window.size, sampling_period)
magnitudes = np.abs(
np.fft.rfft(
(window - np.mean(window)) * np.blackman(window.size)
)
)
yield frequencies, magnitudes
# ...
最后,您必须更新动画代码以在每个滑动窗口位置绘制频率条形图:
def animate(filename, seconds, overlap_percentage, windows):
try:
plt.style.use("dark_background")
except OSError:
pass # Fall back to the default style
fig, ax = plt.subplots(figsize=(16, 9))
fig.canvas.manager.set_window_title(filename)
plt.tight_layout()
plt.box(False)
bar_gap = 0.25
for frequencies, magnitudes in windows:
bar_width = (frequencies[-1] / frequencies.size) * (1 - bar_gap)
plt.cla()
ax.set_xticks([])
ax.set_yticks([])
ax.set_xlim(-bar_width / 2, frequencies[-1] - bar_width / 2)
ax.set_ylim(0, np.max(magnitudes))
ax.bar(frequencies, magnitudes, width=bar_width)
plt.pause(seconds * (1 - overlap_percentage / 100))
运行以下命令以启动频谱图的动画:
python plot_spectrogram.py file.wav --seconds 0.001 --overlap 95
到目前为止,您一直在使用 waveio 包来方便地读取和解码 WAV 文件,这使您可以专注于更高级别的任务。现在是时候添加拼图中缺失的部分并实现WAVReader
的对应物了。您将创建一个能够将音频数据块写入 WAV 文件的惰性写入器。
对于此任务,您将执行一个动手示例——将 Internet 广播电台流式传输到本地 WAV 文件。
为了简化连接到在线流的过程,您将使用一个微小的帮助程序类来实时获取音频帧。
注:这个需要网站di.fm,可能需要注册,比较麻烦。 建议只看writer相关的部分。
stream.py
import av
from waveio.encoding import PCMEncoding
from waveio.metadata import WAVMetadata
class RadioStream:
def __init__(self, stream_url):
self.container = av.open(stream_url)
self.metadata = get_metadata(self.container)
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.container.close()
def __iter__(self):
for chunk in self.container.decode():
yield chunk.to_ndarray()
def get_metadata(container):
(audio_stream,) = container.streams.audio
num_channels = audio_stream.channels
bytes_per_sample = audio_stream.format.bytes // num_channels
return WAVMetadata(
encoding=PCMEncoding(bytes_per_sample),
frames_per_second=audio_stream.rate,
num_channels=num_channels,
)
现在,在 waveio 包中创建writer
模块,并使用以下代码实现将音频帧增量写入新 WAV 文件的功能:
waveio/writer.py
import wave
class WAVWriter:
def __init__(self, metadata, path):
self.metadata = metadata
self._wav_file = wave.open(str(path), mode="wb")
self._wav_file.setframerate(metadata.frames_per_second)
self._wav_file.setnchannels(metadata.num_channels)
self._wav_file.setsampwidth(metadata.encoding)
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self._wav_file.close()
def append_channels(self, channels):
self.append_amplitudes(channels.T.reshape(-1))
def append_amplitudes(self, amplitudes):
frames = self.metadata.encoding.encode(amplitudes)
self._wav_file.writeframes(frames)
该WAVWriter
类采用输出 WAV 文件的 WAVMetadata 实例和路径。然后,它打开文件以二进制模式写入,并使用元数据设置适当的标头值。请注意,在此阶段,音频帧数仍然是未知的,因此无需指定它,而是让 wave 模块稍后在文件关闭时更新它。
就像reader一样,writer对象遵循上下文管理器协议。当您使用 with 关键字输入新上下文时,新 WAVWriter 实例将返回自身。相反,退出上下文将确保即使发生错误,WAV 文件也能正确关闭。
创建 的 WAVWriter 实例后,您可以通过调用 .append_channels()
二维 NumPy 通道数组作为参数来将数据块添加到 WAV 文件中。该方法将通道重塑为振幅值的平面数组,并使用元数据中指定的格式对其进行编码。
在继续操作之前,请将WAVReader
添加到waveio __init__.py
:
from waveio.reader import WAVReader
from waveio.writer import WAVWriter
__all__ = ["WAVReader", "WAVWriter"]
最后,您可以连接这些点:
from argparse import ArgumentParser
from stream import RadioStream
from waveio import WAVWriter
def main():
args = parse_args()
with RadioStream(args.stream_url) as radio_stream:
with WAVWriter(radio_stream.metadata, args.output) as writer:
for channels_chunk in radio_stream:
writer.append_channels(channels_chunk)
def parse_args():
parser = ArgumentParser(description="Record an Internet radio stream")
parser.add_argument("stream_url", help="URL address of the stream")
parser.add_argument(
"-o",
"--output",
metavar="path",
required=True,
type=str,
help="path to the output WAV file",
)
return parser.parse_args()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Aborted")
下面是一个示例命令,演示如何录制 Classic EuroDance 频道:
RADIO_URL=http://prem2.di.fm:80/classiceurodance?your-secret-token
python record_stream.py "$RADIO_URL" --output ripped.wav
在本节中,您将同时从一个 WAV 文件中读取大量音频帧,并以惰性的方式将其修改后的版本写入另一个文件。为此,您需要通过添加以下方法来增强WAVReader
:
# ...
class WAVReader:
DEFAULT_MAX_FRAMES = 1024
# ...
@reshape("columns")
def channels_lazy(self, max_frames=DEFAULT_MAX_FRAMES):
self._wav_file.rewind()
while True:
chunk = self._read(max_frames)
if chunk.size == 0:
break
yield chunk
# ...
与此类中的大多数其他方法和属性一样, .channels_lazy()
装饰用于 @reshape
以更方便的方式排列解码的振幅。不幸的是,此装饰器作用于 NumPy 数组,而您的新方法返回一个生成器对象。若要使它们兼容,必须通过处理两种情况来更新装饰器的定义:
import inspect
import wave
from functools import cached_property, wraps
from waveio.encoding import PCMEncoding
from waveio.metadata import WAVMetadata
def reshape(shape):
if shape not in ("rows", "columns"):
raise ValueError("shape must be either 'rows' or 'columns'")
def decorator(method):
if inspect.isgeneratorfunction(method):
@wraps(method)
def wrapper(self, *args, **kwargs):
for values in method(self, *args, **kwargs):
reshaped = values.reshape(-1, self.metadata.num_channels)
yield reshaped if shape == "rows" else reshaped.T
else:
@wraps(method)
def wrapper(self, *args, **kwargs):
values = method(self, *args, **kwargs)
reshaped = values.reshape(-1, self.metadata.num_channels)
return reshaped if shape == "rows" else reshaped.T
return wrapper
return decorator
# ...
您可以使用inspect
模块来确定装饰器是包装常规方法还是生成器方法。两个包装器执行相同的操作,但生成器包装器在每次迭代中生成重新调整的值,而常规方法包装器返回它们。
最后,您可以添加属性stereo
,告诉您 WAV 文件是否为立体声文件:
class WAVReader:
# ...
@cached_property
def stereo(self):
return 2 == self.metadata.num_channels
通过这些更改,您可以分块读取 WAV 文件并开始应用各种声音效果。例如,您可以扩大或缩小音频文件的立体声场,以增强或减少空间感。 其中一种技术涉及将包含左右声道的传统立体声信号转换为中声道和侧声道。 中声道 (M) 包含两侧共有的单声道分量,而侧声道 (S) 捕获左声道 (L) 和右声道 (R) 之间的差异。您可以使用以下公式在两种表示形式之间进行转换:
在这里插入图片描述
当你分离出侧声道S后,你可以增强S再与M重新组合出左右声道。
创建一个名为 stereo_booster.py
的脚本,该脚本使用可选的强度参数将输入和输出 WAV 文件的路径作为参数:
from argparse import ArgumentParser
def main():
args = parse_args()
def parse_args():
parser = ArgumentParser(description="Widen the stereo field")
parser.add_argument(
"-i",
"--input",
dest="input_path",
required=True,
type=str,
help="path to the input WAV file",
)
parser.add_argument(
"-o",
"--output",
dest="output_path",
required=True,
type=str,
help="path to the output WAV file",
)
parser.add_argument(
"-s",
"--strength",
type=float,
default=1.0,
help="strength (defaults to 1)",
)
return parser.parse_args()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Aborted")
接下来,实现通道转换公式:
from argparse import ArgumentParser
def main():
args = parse_args()
# l,r --> mid, side
def convert_to_ms(left, right):
return (left + right) / 2, (left - right) / 2
# m,s --> l,r
def convert_to_lr(mid, side):
return mid + side, mid - side
# ...
最后,您可以打开立体声 WAV 文件进行读取,分块循环播放其通道,并应用mid-sid增强:
from argparse import ArgumentParser
from waveio import WAVReader, WAVWriter
def main():
args = parse_args()
with (
WAVReader(args.input_path) as source,
WAVWriter(source.metadata, args.output_path) as target,
):
if source.stereo:
for channels_chunk in source.channels_lazy():
mid, side = convert_to_ms(*channels_chunk)
left, right = convert_to_lr(mid, side * args.strength)
target.append_channels(left, right)
else:
print("Only stereo WAV files are supported")
# ...
请注意,您现在将修改后的频道附加为单独的参数,而您的广播录制脚本传递的是单个组合频道的 NumPy 数组。若要使.append_channels()
方法适用于这两种类型的调用,可以按如下方式更新 WAVWriter 类:
import wave
import numpy as np
class WAVWriter:
# ...
def append_channels(self, *channels):
match channels:
case [combined] if combined.ndim > 1:
self.append_amplitudes(combined.T.reshape(-1))
case _:
self.append_amplitudes(np.dstack(channels).reshape(-1))
def append_amplitudes(self, amplitudes):
frames = self.metadata.encoding.encode(amplitudes)
self._wav_file.writeframes(frames)
# ...
# ...
尝试将其中一个示例 WAV 文件(例如自行车铃声)提升五倍:
python stereo_booster.py -i Bicycle-bell.wav -o boosted.wav -s 5