
纳尼(°ロ°)!,听说有人想对几天的时序数据做 FFT?(YUNSWJ 分析版)
我大概问了一下需求,可以写成这样的一段话:采集电压噪声;采样率1M;感兴趣的频段是在300k以下,低频在几k以上;时间一个月。文章用「分段 + Welch PSD + 长时间统计」的思路,把 1 Msps / 1 个月的数据拆成很多小块来算。
采样率:
时间:1 个月 ≈ 30 天
总样本数:
如果全部存成 float32:
不可能整段存着慢慢算,必须:要么边采边算(在线 PSD),只存结果;要么离线,但按“块”从硬盘流式读取处理。
采样率:1 MHz
感兴趣频段:几 kHz ~ 300 kHz(低于几 kHz 的超低频可以忽略(对这个任务来说))
需要看的是这段时间内的平均噪声谱(比如 dBV/√Hz);噪声谱随时间有没有漂移 / 突发异常(比如某天某时分频段噪声抬高)。
这意味着:
选一个比较折中的段长:段长 点
时间长度:
频率分辨率:
61 Hz 的分辨率对于“几 kHz ~ 300 kHz”的噪声分析已经绰绰有余。
要更细一点(比如 10 Hz)就把 调大到 1e5 级;但那样单次 FFT 开销变大、对内存不友好,这个 16k 是比较合适的起点。
窗函数:Hann(汉宁窗)好处是旁瓣够低,频谱泄漏控制得不错。
重叠:50%(即 step = N_seg // 2 = 8192,因为要保证分块数据)
那么一秒钟的数据(1e6 点)可以分成大约:
个段
(这里留个尾巴,晚些补)
这里提到的“重叠 50%(step = N_seg//2)因为要保证分块数据”,背后其实有两层含义:
Welch PSD 的流程是:分段 → 加窗 → FFT → 功率谱 → 多段平均。
如果 不重叠(overlap=0),每段长度 N_seg,那么一块数据里能取的段数少;如果 50% 重叠(step=N_seg/2),同样长度的数据能取到大约两倍的段数;多段平均的好处是:PSD 方差下降(更稳定、更平滑)。
所以 50% overlap 的“工程意义”是:
更多段数 → 更好的平均效果;在不增加 FFT 点数的情况下(Δf 不变),减少 PSD 估计的抖动
但 overlap 不是越大越好,因为:重叠越大,相邻段共享数据越多 → 段与段之间相关性变强,而且计算量也会上升。
对于 Hann(汉宁)窗,50% overlap 是一个经典折中:它能在泄漏控制、方差降低、计算量之间取得平衡(很多标准实现默认就是 Hann+50%)。
在工程里通常是这么处理数据的:数据源连续不断来(1 Msps),不可能一次拿一个月,只能按块读取(比如每次 10 秒 / 0.5 秒),你希望“块 A 的尾部”和“块 B 的头部”拼起来后,分段切片仍然等价于“对整条连续数据切片”
关键问题:分段的边界。
举例:
N_seg = 16384
step = 8192(50% overlap)
对块 A 做分段,最后一个段可能从 A[k] 到 A[k+16383],下一段按 step 应该从 A[k+8192] 开始;但这个 “下一段”的前半部分在块 A 末尾、后半部分在块 B 开头; 如果你不“跨块拼接”,这段就丢失了。
丢失会带来两件事:段数减少(平均变差)
更关键:PSD 会出现“与块边界有关”的轻微偏差(不是真实信号特性)
最常见做法就是看到的 carry:每处理完一个块,留下一段尾巴给下一块拼接,尾巴长度一般就是 N_seg - step
为什么是这个长度?因为:下一段的起点相对上一段前移 step,要让“下一块”开头能凑出完整 N_seg,你至少要保留上一块末尾那部分
公式:
当 overlap=50%:
step = N_seg/2
carry = N_seg - step = N_seg/2
也就是:只需要保留半段长度就够了,内存很省,逻辑也很简单。
绝对不需要“每一段的全频谱都存一个月”,否则:假设“每分钟存一条平均频谱”那一天就是1440 条;一个月:≈ 4.3 万条;每条频谱 0–300 kHz → 大约 个频点;总 float32 数量:,约 0.8 GB → 其实还能接受。
所以我建议分两段:
在一个月的流式处理中,维护:
这三个向量大小大约 8k 点 × 3 × float64,几十 KB 级——非常小。
按“每分钟”作为一个时间单位:每分钟包含若干个 16.384 ms 的段(数量 ≈ 60 / 0.016 ≈ 3600 段);对这一分钟内的所有段的 PSD 做平均,得到一个“每分钟平均 PSD”。
然后这里还有两个选择:
存整条 PSD(5k 点):用来画 spectrogram(时间–频率图)和长时间变化,大约 0.8 GB/月,PC 完全能撑住。(倒不一定是 PC,硬盘吧~)
只存几个频段的积分功率(RMS):
比如:
Band1: 3–10 kHz
Band2: 10–100 kHz
Band3: 100–300 kHz
每分钟存 3 个值 → 30 天也就 3 × 4.3 万 ≈ 13 万个点,几百 KB。
设计方案中可以两者结合:对重要实验,存每分钟的全频谱 PSD → 后期随便切片分析;对长期部署系统,只存 band power 做监控。
因为没有真实的数据,这里就做一些仿真数据;要做多分辨率频谱分析(3–20 kHz 用更细 Δf;20–300 kHz 用普通 Δf),以及在线触发 / 事件检测(频段 RMS 超阈值就“抓原始波形”),只用 5 s 的仿真数据(1 Msps),逻辑上完全一样,后面只要把“信号来源”换成真实采集流就行了。
采样率:fs = 1_000_000(1 Msps)
时长:duration_sec = 5(演示用,避免占太多内存)
信号组成:
宽带白噪声(模拟电压噪声底)
固定的两个窄带干扰:10 kHz、100 kHz 正弦
在某些时间段注入“事件”:
事件 1(低频带 3–20 kHz 抬高):1.5–2.5 s
事件 2(高频带 100–200 kHz 抬高):3.0–4.0 s
分块分析:每个块长度 chunk_duration = 0.5 s → chunk_size = 500_000 点;把整段数据拆成 10 个块,模拟“在线接收数据”
对每个块:
用标准 Welch 计算 PSD(N_seg = 16384,Δf≈61 Hz)
计算三个频段的带限噪声功率:
Band1: 3–20 kHz
Band2: 20–100 kHz
Band3: 100–300 kHz
用前几个块估计 baseline;后面的块如果某 band 的 RMS > baseline × 阈值 → 触发事件,并保存该块原始波形
多分辨率 PSD:
低频细分辨率:N_seg_low = 65536(Δf≈15 Hz),3–20 kHz
高频普通分辨率:N_seg_high = 16384(Δf≈61 Hz),20–300 kHz

总点数: 5000000, 块数: 10
基线估计完成: {'B1_3_20k': np.float64(1.0034820612785113e-06), 'B2_20_100k': np.float64(1.6369128739172906e-07), 'B3_100_300k': np.float64(1.560139330210296e-07)}
事件触发: 块 4, 频段 B1_3_20k, 功率=1.962e-05, baseline=1.003e-06
事件触发: 块 4, 频段 B2_20_100k, 功率=7.974e-07, baseline=1.637e-07
事件触发: 块 6, 频段 B2_20_100k, 功率=6.971e-07, baseline=1.637e-07
事件触发: 块 6, 频段 B3_100_300k, 功率=2.956e-05, baseline=1.560e-07
事件触发: 块 7, 频段 B2_20_100k, 功率=6.977e-07, baseline=1.637e-07
事件触发: 块 7, 频段 B3_100_300k, 功率=2.955e-05, baseline=1.560e-07
检测到事件数量: 6
事件: chunk=4, band=B1_3_20k, chunk_len=500000
事件: chunk=4, band=B2_20_100k, chunk_len=500000
事件: chunk=6, band=B2_20_100k, chunk_len=500000
事件: chunk=6, band=B3_100_300k, chunk_len=500000
事件: chunk=7, band=B2_20_100k, chunk_len=500000
事件: chunk=7, band=B3_100_300k, chunk_len=500000

思路完全一样,只是把“仿真信号 x”换成“实时采集的块数据”。
采集侧:以固定块(例如 0.5 s 或 1 s)为单位,从 ADC 拉数据 → 放进环形 buffer;每满一块就 DMA / streaming 到上位机,或者在本地做简单统计(带限积分可以做 IIR 滤波估算)。
detect_events_online 现在是对整段 x 切块:
for i in range(n_chunks):
start = i * CHUNK_SIZE
end = start + CHUNK_SIZE
chunk = x[start:end]
...
在真实系统里,可以改成:
def process_new_chunk(chunk):
# 这里 chunk 就是刚刚采满的一块原始数据
# 把 detect_events_online 里的那段逻辑剪出来放进来
...
并把 band_powers、baselines、events 等变量做成“全局状态”或类成员,就成了真正的“在线事件检测器”。
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, List, Dict
# ======================
# 配置参数
# ======================
FS = 1_000_000 # 采样率 1 Msps
DURATION_SEC = 5.0 # 仿真总时长 5 s(演示用,可改大)
CHUNK_DURATION = 0.5 # 每块 0.5 s,用来模拟在线处理
CHUNK_SIZE = int(FS * CHUNK_DURATION)
# Welch 参数(高频段默认配置)
N_SEG_HIGH = 16_384
OVERLAP_HIGH = 0.5
# 多分辨率:低频采用更长段
N_SEG_LOW = 65_536
OVERLAP_LOW = 0.5
F_MAX = 300_000.0 # 只关注到 300 kHz
# ======================
# 1. 生成仿真信号
# ======================
def generate_band_noise(t: np.ndarray,
f_lo: float,
f_hi: float,
n_tones: int,
amplitude: float,
seed: int = None) -> np.ndarray:
"""
通过叠加多条随机相位正弦,近似生成 [f_lo, f_hi] 带宽内的带限噪声。
"""
rng = np.random.default_rng(seed)
freqs = np.linspace(f_lo, f_hi, n_tones)
phases = rng.uniform(0, 2 * np.pi, size=n_tones)
# broadcast: (N,1) * (1,M) + (1,M)
t_col = t[:, None]
freqs_row = freqs[None, :]
phases_row = phases[None, :]
sig = amplitude * np.sin(2 * np.pi * freqs_row * t_col + phases_row)
# sum over tones
return sig.sum(axis=1)
def generate_synthetic_signal(fs: float = FS,
duration_sec: float = DURATION_SEC) -> Tuple[np.ndarray, np.ndarray]:
"""
生成带有两个固定正弦 + 两段事件带限噪声的仿真信号。
返回:
t: 时间轴 (s)
x: 信号 (V)
"""
n_total = int(fs * duration_sec)
t = np.arange(n_total) / fs
rng = np.random.default_rng(42)
# 基础白噪声(比如 0.5 mV_rms)
base_noise = rng.normal(0.0, 0.5e-3, size=n_total)
# 两个固定窄带干扰
tone1 = 2e-3 * np.sin(2 * np.pi * 10_000 * t) # 10 kHz
tone2 = 1e-3 * np.sin(2 * np.pi * 100_000 * t) # 100 kHz
x = base_noise + tone1 + tone2
# 事件 1:1.5~2.5 s,抬高 3–20 kHz 带宽
t_event1 = (t >= 1.5) & (t < 2.5)
x[t_event1] += generate_band_noise(
t[t_event1], f_lo=3_000, f_hi=20_000,
n_tones=20, amplitude=2e-3, seed=1
)
# 事件 2:3.0~4.0 s,抬高 100–200 kHz 带宽
t_event2 = (t >= 3.0) & (t < 4.0)
x[t_event2] += generate_band_noise(
t[t_event2], f_lo=100_000, f_hi=200_000,
n_tones=30, amplitude=2e-3, seed=2
)
return t, x
# ======================
# 2. 通用 Welch PSD(单块版)
# ======================
def welch_psd(data: np.ndarray,
fs: float,
n_seg: int,
overlap: float) -> Tuple[np.ndarray, np.ndarray]:
"""
对一大块 data 做 Welch PSD:
- 切成多段
- 去直流 + 加窗
- rFFT
- 平均得到单边 PSD (V^2/Hz)
返回:
freqs: 频率轴
psd: PSD (V^2/Hz)
"""
data = np.asarray(data)
n = data.size
if n < n_seg:
raise ValueError("数据长度小于 n_seg,无法做 Welch PSD")
window = np.hanning(n_seg)
U = (window ** 2).mean()
step = int(n_seg * (1 - overlap))
n_segs = 1 + (n - n_seg) // step
n_fft = n_seg // 2 + 1
psd_accum = np.zeros(n_fft, dtype=np.float64)
count = 0
for i in range(n_segs):
start = i * step
seg = data[start:start + n_seg]
seg = seg - seg.mean()
seg = seg * window
X = np.fft.rfft(seg)
Pxx = (np.abs(X) ** 2) / (fs * n_seg * U)
psd_accum += Pxx
count += 1
psd = psd_accum / max(count, 1)
freqs = np.fft.rfftfreq(n_seg, d=1.0 / fs)
return freqs, psd
# ======================
# 3. 多分辨率 PSD
# ======================
def multi_resolution_psd(data: np.ndarray,
fs: float = FS) -> Dict[str, np.ndarray]:
"""
多分辨率示例:
- 低频 3–20 kHz 用 N_SEG_LOW = 65536 (Δf ~ 15 Hz)
- 高频 20–300 kHz 用 N_SEG_HIGH = 16384 (Δf ~ 61 Hz)
"""
# 低频高分辨率
freqs_low, psd_low = welch_psd(data, fs, N_SEG_LOW, OVERLAP_LOW)
# 高频普通分辨率
freqs_high, psd_high = welch_psd(data, fs, N_SEG_HIGH, OVERLAP_HIGH)
# 裁剪频段
idx_low_band = (freqs_low >= 3_000) & (freqs_low <= 20_000)
idx_high_band = (freqs_high >= 20_000) & (freqs_high <= 300_000)
return {
"freqs_low": freqs_low[idx_low_band],
"psd_low": psd_low[idx_low_band],
"freqs_high": freqs_high[idx_high_band],
"psd_high": psd_high[idx_high_band]
}
# ======================
# 4. 事件检测:按块在线处理
# ======================
def compute_band_power(freqs: np.ndarray,
psd: np.ndarray,
f_lo: float,
f_hi: float) -> float:
"""
对 PSD 在 [f_lo, f_hi] 上积分,得到噪声功率 (V^2)
"""
idx = (freqs >= f_lo) & (freqs <= f_hi)
if not np.any(idx):
return 0.0
df = freqs[1] - freqs[0]
return float(np.sum(psd[idx] * df))
def detect_events_online(x: np.ndarray,
fs: float = FS) -> Dict:
"""
模拟在线处理:
- 每 CHUNK_SIZE 点为一块
- 对每块做 Welch PSD
- 计算 3 个频段噪声功率
- 用前 baseline_chunks 块估计基线
- 后续块超过阈值则认定为事件,并保存原始块
"""
n_total = x.size
n_chunks = n_total // CHUNK_SIZE
print(f"总点数: {n_total}, 块数: {n_chunks}")
# 频段定义
bands = [
("B1_3_20k", 3_000, 20_000),
("B2_20_100k", 20_000, 100_000),
("B3_100_300k", 100_000, 300_000),
]
# 每块各频段功率
band_powers = {name: [] for name, _, _ in bands}
# 基线估计用前几块
baseline_chunks = 3
baselines = {name: None for name, _, _ in bands}
# 触发阈值:比如 3 倍 baseline
threshold_factor = 3.0
events = [] # 存放 {"chunk_idx": i, "band": name, "chunk_data": array}
for i in range(n_chunks):
start = i * CHUNK_SIZE
end = start + CHUNK_SIZE
chunk = x[start:end]
# 对这一块做高频段 Welch(统一配置)
freqs, psd = welch_psd(chunk, fs, N_SEG_HIGH, OVERLAP_HIGH)
# 裁剪到 F_MAX
idx_fmax = freqs <= F_MAX
freqs_c = freqs[idx_fmax]
psd_c = psd[idx_fmax]
# 计算各频段功率
current_powers = {}
for name, f_lo, f_hi in bands:
p = compute_band_power(freqs_c, psd_c, f_lo, f_hi)
band_powers[name].append(p)
current_powers[name] = p
# 更新基线/检测事件
if i < baseline_chunks:
# 暂存,最后一起算基线
pass
elif i == baseline_chunks:
# 用前 baseline_chunks 块的值算一次基线
for name in baselines.keys():
vals = np.array(band_powers[name][:baseline_chunks])
baselines[name] = np.median(vals)
print("基线估计完成:", baselines)
else:
# 常规事件检测
for name, _, _ in bands:
baseline = baselines[name]
if baseline is None or baseline == 0:
continue
if current_powers[name] > baseline * threshold_factor:
print(f"事件触发: 块 {i}, 频段 {name}, "
f"功率={current_powers[name]:.3e}, baseline={baseline:.3e}")
events.append({
"chunk_idx": i,
"band": name,
"chunk_data": chunk.copy()
})
# 转成 numpy 数组方便后处理
for name in band_powers.keys():
band_powers[name] = np.array(band_powers[name])
return {
"band_powers": band_powers,
"events": events,
"chunk_times": np.arange(n_chunks) * CHUNK_DURATION
}
# ======================
# 5. 一些可视化函数
# ======================
def plot_multi_resolution(multi_res: Dict[str, np.ndarray]):
freqs_low = multi_res["freqs_low"]
psd_low = multi_res["psd_low"]
freqs_high = multi_res["freqs_high"]
psd_high = multi_res["psd_high"]
psd_low_db = 10 * np.log10(psd_low + 1e-30)
psd_high_db = 10 * np.log10(psd_high + 1e-30)
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(freqs_low, psd_low_db)
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD [dB V^2/Hz]")
plt.title("Low band (3–20 kHz, high resolution)")
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(freqs_high, psd_high_db)
plt.xlabel("Frequency [Hz]")
plt.ylabel("PSD [dB V^2/Hz]")
plt.title("High band (20–300 kHz, normal resolution)")
plt.grid(True)
plt.tight_layout()
plt.show()
def plot_band_powers(result: Dict):
band_powers = result["band_powers"]
chunk_times = result["chunk_times"]
plt.figure()
for name, vals in band_powers.items():
# RMS = sqrt(power)
plt.plot(chunk_times, np.sqrt(vals), marker="o", label=name)
plt.xlabel("Time [s] (chunk index * chunk_duration)")
plt.ylabel("Band noise RMS [V]")
plt.title("Band-limited noise vs time (per chunk)")
plt.grid(True)
plt.legend()
plt.show()
# ======================
# 6. 主流程示例
# ======================
if __name__ == "__main__":
# 1) 生成仿真数据
t, x = generate_synthetic_signal(FS, DURATION_SEC)
print("生成仿真数据完成:", x.shape)
# 2) 随便拿一块数据做多分辨率 PSD(比如中间 0.5s)
mid_start = int(2.0 * FS)
mid_end = mid_start + CHUNK_SIZE
segment = x[mid_start:mid_end]
multi_res = multi_resolution_psd(segment, FS)
plot_multi_resolution(multi_res)
# 3) 在线事件检测(按 0.5s 一块)
result = detect_events_online(x, FS)
print("检测到事件数量:", len(result["events"]))
for ev in result["events"]:
print(f" 事件: chunk={ev['chunk_idx']}, band={ev['band']}, "
f"chunk_len={ev['chunk_data'].size}")
# 4) 画出各频段 RMS 随时间变化
plot_band_powers(result)