首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >纳尼(°ロ°)!,听说有人想对几天的时序数据做 FFT?(YUNSWJ 实现版)

纳尼(°ロ°)!,听说有人想对几天的时序数据做 FFT?(YUNSWJ 实现版)

作者头像
云深无际
发布2026-01-07 14:20:48
发布2026-01-07 14:20:48
880
举报
文章被收录于专栏:云深之无迹云深之无迹

纳尼(°ロ°)!,听说有人想对几天的时序数据做 FFT?(YUNSWJ 分析版)

我大概问了一下需求,可以写成这样的一段话:采集电压噪声;采样率1M;感兴趣的频段是在300k以下,低频在几k以上;时间一个月。文章用「分段 + Welch PSD + 长时间统计」的思路,把 1 Msps / 1 个月的数据拆成很多小块来算。

先看一眼规模有多恐怖

采样率:

时间:1 个月 ≈ 30 天

总样本数:

如果全部存成 float32:

不可能整段存着慢慢算,必须:要么边采边算(在线 PSD),只存结果;要么离线,但按“块”从硬盘流式读取处理。

频谱分析目标再明确一下

采样率:1 MHz

感兴趣频段:几 kHz ~ 300 kHz(低于几 kHz 的超低频可以忽略(对这个任务来说))

需要看的是这段时间内的平均噪声谱(比如 dBV/√Hz);噪声谱随时间有没有漂移 / 突发异常(比如某天某时分频段噪声抬高)。

这意味着:

  1. 频率分辨率不用特别变态,几十 Hz 甚至 100 Hz 就够用了
  2. 真正需要的是:0–300 kHz 的 PSD 曲线(长时间平均);一些关键频段的集成噪声 vs 时间,分辨率按“分钟级”已经够看变化

分段 FFT / Welch 方案

分段长度和分辨率

选一个比较折中的段长:段长 点

时间长度:

频率分辨率:

61 Hz 的分辨率对于“几 kHz ~ 300 kHz”的噪声分析已经绰绰有余。

要更细一点(比如 10 Hz)就把 调大到 1e5 级;但那样单次 FFT 开销变大、对内存不友好,这个 16k 是比较合适的起点。

窗函数 & 重叠

窗函数:Hann(汉宁窗)好处是旁瓣够低,频谱泄漏控制得不错。

重叠:50%(即 step = N_seg // 2 = 8192,因为要保证分块数据)

那么一秒钟的数据(1e6 点)可以分成大约:

个段

(这里留个尾巴,晚些补)

为什么要重叠?

这里提到的“重叠 50%(step = N_seg//2)因为要保证分块数据”,背后其实有两层含义:

Welch 的 50% overlap:为了“同样的数据量,更多独立的平均”

Welch PSD 的流程是:分段 → 加窗 → FFT → 功率谱 → 多段平均。

如果 不重叠(overlap=0),每段长度 N_seg,那么一块数据里能取的段数少;如果 50% 重叠(step=N_seg/2),同样长度的数据能取到大约两倍的段数;多段平均的好处是:PSD 方差下降(更稳定、更平滑)。

所以 50% overlap 的“工程意义”是:

更多段数 → 更好的平均效果;在不增加 FFT 点数的情况下(Δf 不变),减少 PSD 估计的抖动

但 overlap 不是越大越好,因为:重叠越大,相邻段共享数据越多 → 段与段之间相关性变强,而且计算量也会上升。

对于 Hann(汉宁)窗,50% overlap 是一个经典折中:它能在泄漏控制、方差降低、计算量之间取得平衡(很多标准实现默认就是 Hann+50%)。

“保证分块数据”到底是什么意思?

在工程里通常是这么处理数据的:数据源连续不断来(1 Msps),不可能一次拿一个月,只能按块读取(比如每次 10 秒 / 0.5 秒),你希望“块 A 的尾部”和“块 B 的头部”拼起来后,分段切片仍然等价于“对整条连续数据切片”

关键问题:分段的边界

如果不做任何处理,会漏掉跨块的段

举例:

N_seg = 16384

step = 8192(50% overlap)

对块 A 做分段,最后一个段可能从 A[k]A[k+16383],下一段按 step 应该从 A[k+8192] 开始;但这个 “下一段”的前半部分在块 A 末尾、后半部分在块 B 开头; 如果你不“跨块拼接”,这段就丢失了。

丢失会带来两件事:段数减少(平均变差)

更关键:PSD 会出现“与块边界有关”的轻微偏差(不是真实信号特性)

所以要留 “carry”(尾巴)跨块拼起来

最常见做法就是看到的 carry:每处理完一个块,留下一段尾巴给下一块拼接,尾巴长度一般就是 N_seg - step

为什么是这个长度?因为:下一段的起点相对上一段前移 step,要让“下一块”开头能凑出完整 N_seg,你至少要保留上一块末尾那部分

公式:

当 overlap=50%:

step = N_seg/2

carry = N_seg - step = N_seg/2

也就是:只需要保留半段长度就够了,内存很省,逻辑也很简单。

长时间统计思路:真正需要存什么

绝对不需要“每一段的全频谱都存一个月”,否则:假设“每分钟存一条平均频谱”那一天就是1440 条;一个月:≈ 4.3 万条;每条频谱 0–300 kHz → 大约 个频点;总 float32 数量:,约 0.8 GB → 其实还能接受。

所以我建议分两段:

全局统计

在一个月的流式处理中,维护:

  1. 全局平均 PSD(累加后除以总段数)
  2. 全局最小 / 最大 PSD(每个频点的 min / max)

这三个向量大小大约 8k 点 × 3 × float64,几十 KB 级——非常小。

按照每分钟的 PSD 或“频段功率”时间序列

按“每分钟”作为一个时间单位:每分钟包含若干个 16.384 ms 的段(数量 ≈ 60 / 0.016 ≈ 3600 段);对这一分钟内的所有段的 PSD 做平均,得到一个“每分钟平均 PSD”。

然后这里还有两个选择:

存整条 PSD(5k 点):用来画 spectrogram(时间–频率图)和长时间变化,大约 0.8 GB/月,PC 完全能撑住。(倒不一定是 PC,硬盘吧~)

只存几个频段的积分功率(RMS):

比如:

Band1: 3–10 kHz

Band2: 10–100 kHz

Band3: 100–300 kHz

每分钟存 3 个值 → 30 天也就 3 × 4.3 万 ≈ 13 万个点,几百 KB。

设计方案中可以两者结合:对重要实验,存每分钟的全频谱 PSD → 后期随便切片分析;对长期部署系统,只存 band power 做监控。

Python 频谱分析流程代码

因为没有真实的数据,这里就做一些仿真数据;要做多分辨率频谱分析(3–20 kHz 用更细 Δf;20–300 kHz 用普通 Δf),以及在线触发 / 事件检测(频段 RMS 超阈值就“抓原始波形”),只用 5 s 的仿真数据(1 Msps),逻辑上完全一样,后面只要把“信号来源”换成真实采集流就行了。

采样率:fs = 1_000_000(1 Msps)

时长:duration_sec = 5(演示用,避免占太多内存)

信号组成:

宽带白噪声(模拟电压噪声底)

固定的两个窄带干扰:10 kHz、100 kHz 正弦

在某些时间段注入“事件”:

事件 1(低频带 3–20 kHz 抬高):1.5–2.5 s

事件 2(高频带 100–200 kHz 抬高):3.0–4.0 s

分块分析:每个块长度 chunk_duration = 0.5 schunk_size = 500_000 点;把整段数据拆成 10 个块,模拟“在线接收数据”

对每个块:

用标准 Welch 计算 PSD(N_seg = 16384,Δf≈61 Hz)

计算三个频段的带限噪声功率:

Band1: 3–20 kHz

Band2: 20–100 kHz

Band3: 100–300 kHz

用前几个块估计 baseline;后面的块如果某 band 的 RMS > baseline × 阈值 → 触发事件,并保存该块原始波形

多分辨率 PSD:

低频细分辨率:N_seg_low = 65536(Δf≈15 Hz),3–20 kHz

高频普通分辨率:N_seg_high = 16384(Δf≈61 Hz),20–300 kHz

仿真 + 分段 FFT + 多分辨率 + 事件检测

image-20251219165838149
image-20251219165838149
代码语言:javascript
复制
总点数: 5000000, 块数: 10
基线估计完成: {'B1_3_20k': np.float64(1.0034820612785113e-06), 'B2_20_100k': np.float64(1.6369128739172906e-07), 'B3_100_300k': np.float64(1.560139330210296e-07)}
事件触发: 块 4, 频段 B1_3_20k, 功率=1.962e-05, baseline=1.003e-06
事件触发: 块 4, 频段 B2_20_100k, 功率=7.974e-07, baseline=1.637e-07
事件触发: 块 6, 频段 B2_20_100k, 功率=6.971e-07, baseline=1.637e-07
事件触发: 块 6, 频段 B3_100_300k, 功率=2.956e-05, baseline=1.560e-07
事件触发: 块 7, 频段 B2_20_100k, 功率=6.977e-07, baseline=1.637e-07
事件触发: 块 7, 频段 B3_100_300k, 功率=2.955e-05, baseline=1.560e-07
检测到事件数量: 6
  事件: chunk=4, band=B1_3_20k, chunk_len=500000
  事件: chunk=4, band=B2_20_100k, chunk_len=500000
  事件: chunk=6, band=B2_20_100k, chunk_len=500000
  事件: chunk=6, band=B3_100_300k, chunk_len=500000
  事件: chunk=7, band=B2_20_100k, chunk_len=500000
  事件: chunk=7, band=B3_100_300k, chunk_len=500000
image-20251219165855660
image-20251219165855660

怎么迁移到“一个月实测数据”上?

思路完全一样,只是把“仿真信号 x”换成“实时采集的块数据”。

MCU/FPGA端处理思路

采集侧:以固定块(例如 0.5 s 或 1 s)为单位,从 ADC 拉数据 → 放进环形 buffer;每满一块就 DMA / streaming 到上位机,或者在本地做简单统计(带限积分可以做 IIR 滤波估算)。

用真实数据流替换仿真数据

detect_events_online 现在是对整段 x 切块:

代码语言:javascript
复制
for i in range(n_chunks):
    start = i * CHUNK_SIZE
    end = start + CHUNK_SIZE
    chunk = x[start:end]
    ...

在真实系统里,可以改成:

代码语言:javascript
复制
def process_new_chunk(chunk):
    # 这里 chunk 就是刚刚采满的一块原始数据
    # 把 detect_events_online 里的那段逻辑剪出来放进来
    ...

并把 band_powersbaselinesevents 等变量做成“全局状态”或类成员,就成了真正的“在线事件检测器”。

代码语言:javascript
复制
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple, List, Dict


# ======================
# 配置参数
# ======================
FS = 1_000_000           # 采样率 1 Msps
DURATION_SEC = 5.0       # 仿真总时长 5 s(演示用,可改大)
CHUNK_DURATION = 0.5     # 每块 0.5 s,用来模拟在线处理
CHUNK_SIZE = int(FS * CHUNK_DURATION)

# Welch 参数(高频段默认配置)
N_SEG_HIGH = 16_384
OVERLAP_HIGH = 0.5

# 多分辨率:低频采用更长段
N_SEG_LOW = 65_536
OVERLAP_LOW = 0.5

F_MAX = 300_000.0        # 只关注到 300 kHz


# ======================
# 1. 生成仿真信号
# ======================
def generate_band_noise(t: np.ndarray,
                        f_lo: float,
                        f_hi: float,
                        n_tones: int,
                        amplitude: float,
                        seed: int = None) -> np.ndarray:
    """
    通过叠加多条随机相位正弦,近似生成 [f_lo, f_hi] 带宽内的带限噪声。
    """
    rng = np.random.default_rng(seed)
    freqs = np.linspace(f_lo, f_hi, n_tones)
    phases = rng.uniform(0, 2 * np.pi, size=n_tones)

    # broadcast: (N,1) * (1,M) + (1,M)
    t_col = t[:, None]
    freqs_row = freqs[None, :]
    phases_row = phases[None, :]

    sig = amplitude * np.sin(2 * np.pi * freqs_row * t_col + phases_row)
    # sum over tones
    return sig.sum(axis=1)


def generate_synthetic_signal(fs: float = FS,
                              duration_sec: float = DURATION_SEC) -> Tuple[np.ndarray, np.ndarray]:
    """
    生成带有两个固定正弦 + 两段事件带限噪声的仿真信号。
    返回:
      t: 时间轴 (s)
      x: 信号 (V)
    """
    n_total = int(fs * duration_sec)
    t = np.arange(n_total) / fs

    rng = np.random.default_rng(42)

    # 基础白噪声(比如 0.5 mV_rms)
    base_noise = rng.normal(0.0, 0.5e-3, size=n_total)

    # 两个固定窄带干扰
    tone1 = 2e-3 * np.sin(2 * np.pi * 10_000 * t)   # 10 kHz
    tone2 = 1e-3 * np.sin(2 * np.pi * 100_000 * t)  # 100 kHz

    x = base_noise + tone1 + tone2

    # 事件 1:1.5~2.5 s,抬高 3–20 kHz 带宽
    t_event1 = (t >= 1.5) & (t < 2.5)
    x[t_event1] += generate_band_noise(
        t[t_event1], f_lo=3_000, f_hi=20_000,
        n_tones=20, amplitude=2e-3, seed=1
    )

    # 事件 2:3.0~4.0 s,抬高 100–200 kHz 带宽
    t_event2 = (t >= 3.0) & (t < 4.0)
    x[t_event2] += generate_band_noise(
        t[t_event2], f_lo=100_000, f_hi=200_000,
        n_tones=30, amplitude=2e-3, seed=2
    )

    return t, x


# ======================
# 2. 通用 Welch PSD(单块版)
# ======================
def welch_psd(data: np.ndarray,
              fs: float,
              n_seg: int,
              overlap: float) -> Tuple[np.ndarray, np.ndarray]:
    """
    对一大块 data 做 Welch PSD:
      - 切成多段
      - 去直流 + 加窗
      - rFFT
      - 平均得到单边 PSD (V^2/Hz)

    返回:
      freqs: 频率轴
      psd:   PSD (V^2/Hz)
    """
    data = np.asarray(data)
    n = data.size
    if n < n_seg:
        raise ValueError("数据长度小于 n_seg,无法做 Welch PSD")

    window = np.hanning(n_seg)
    U = (window ** 2).mean()
    step = int(n_seg * (1 - overlap))
    n_segs = 1 + (n - n_seg) // step

    n_fft = n_seg // 2 + 1
    psd_accum = np.zeros(n_fft, dtype=np.float64)
    count = 0

    for i in range(n_segs):
        start = i * step
        seg = data[start:start + n_seg]
        seg = seg - seg.mean()
        seg = seg * window
        X = np.fft.rfft(seg)
        Pxx = (np.abs(X) ** 2) / (fs * n_seg * U)
        psd_accum += Pxx
        count += 1

    psd = psd_accum / max(count, 1)
    freqs = np.fft.rfftfreq(n_seg, d=1.0 / fs)
    return freqs, psd


# ======================
# 3. 多分辨率 PSD
# ======================
def multi_resolution_psd(data: np.ndarray,
                         fs: float = FS) -> Dict[str, np.ndarray]:
    """
    多分辨率示例:
      - 低频 3–20 kHz 用 N_SEG_LOW = 65536 (Δf ~ 15 Hz)
      - 高频 20–300 kHz 用 N_SEG_HIGH = 16384 (Δf ~ 61 Hz)
    """
    # 低频高分辨率
    freqs_low, psd_low = welch_psd(data, fs, N_SEG_LOW, OVERLAP_LOW)
    # 高频普通分辨率
    freqs_high, psd_high = welch_psd(data, fs, N_SEG_HIGH, OVERLAP_HIGH)

    # 裁剪频段
    idx_low_band = (freqs_low >= 3_000) & (freqs_low <= 20_000)
    idx_high_band = (freqs_high >= 20_000) & (freqs_high <= 300_000)

    return {
        "freqs_low": freqs_low[idx_low_band],
        "psd_low": psd_low[idx_low_band],
        "freqs_high": freqs_high[idx_high_band],
        "psd_high": psd_high[idx_high_band]
    }


# ======================
# 4. 事件检测:按块在线处理
# ======================
def compute_band_power(freqs: np.ndarray,
                       psd: np.ndarray,
                       f_lo: float,
                       f_hi: float) -> float:
    """
    对 PSD 在 [f_lo, f_hi] 上积分,得到噪声功率 (V^2)
    """
    idx = (freqs >= f_lo) & (freqs <= f_hi)
    if not np.any(idx):
        return 0.0
    df = freqs[1] - freqs[0]
    return float(np.sum(psd[idx] * df))


def detect_events_online(x: np.ndarray,
                         fs: float = FS) -> Dict:
    """
    模拟在线处理:
      - 每 CHUNK_SIZE 点为一块
      - 对每块做 Welch PSD
      - 计算 3 个频段噪声功率
      - 用前 baseline_chunks 块估计基线
      - 后续块超过阈值则认定为事件,并保存原始块
    """
    n_total = x.size
    n_chunks = n_total // CHUNK_SIZE
    print(f"总点数: {n_total}, 块数: {n_chunks}")

    # 频段定义
    bands = [
        ("B1_3_20k", 3_000, 20_000),
        ("B2_20_100k", 20_000, 100_000),
        ("B3_100_300k", 100_000, 300_000),
    ]

    # 每块各频段功率
    band_powers = {name: [] for name, _, _ in bands}

    # 基线估计用前几块
    baseline_chunks = 3
    baselines = {name: None for name, _, _ in bands}

    # 触发阈值:比如 3 倍 baseline
    threshold_factor = 3.0

    events = []  # 存放 {"chunk_idx": i, "band": name, "chunk_data": array}

    for i in range(n_chunks):
        start = i * CHUNK_SIZE
        end = start + CHUNK_SIZE
        chunk = x[start:end]

        # 对这一块做高频段 Welch(统一配置)
        freqs, psd = welch_psd(chunk, fs, N_SEG_HIGH, OVERLAP_HIGH)
        # 裁剪到 F_MAX
        idx_fmax = freqs <= F_MAX
        freqs_c = freqs[idx_fmax]
        psd_c = psd[idx_fmax]

        # 计算各频段功率
        current_powers = {}
        for name, f_lo, f_hi in bands:
            p = compute_band_power(freqs_c, psd_c, f_lo, f_hi)
            band_powers[name].append(p)
            current_powers[name] = p

        # 更新基线/检测事件
        if i < baseline_chunks:
            # 暂存,最后一起算基线
            pass
        elif i == baseline_chunks:
            # 用前 baseline_chunks 块的值算一次基线
            for name in baselines.keys():
                vals = np.array(band_powers[name][:baseline_chunks])
                baselines[name] = np.median(vals)
            print("基线估计完成:", baselines)
        else:
            # 常规事件检测
            for name, _, _ in bands:
                baseline = baselines[name]
                if baseline is None or baseline == 0:
                    continue
                if current_powers[name] > baseline * threshold_factor:
                    print(f"事件触发: 块 {i}, 频段 {name}, "
                          f"功率={current_powers[name]:.3e}, baseline={baseline:.3e}")
                    events.append({
                        "chunk_idx": i,
                        "band": name,
                        "chunk_data": chunk.copy()
                    })

    # 转成 numpy 数组方便后处理
    for name in band_powers.keys():
        band_powers[name] = np.array(band_powers[name])

    return {
        "band_powers": band_powers,
        "events": events,
        "chunk_times": np.arange(n_chunks) * CHUNK_DURATION
    }


# ======================
# 5. 一些可视化函数
# ======================
def plot_multi_resolution(multi_res: Dict[str, np.ndarray]):
    freqs_low = multi_res["freqs_low"]
    psd_low = multi_res["psd_low"]
    freqs_high = multi_res["freqs_high"]
    psd_high = multi_res["psd_high"]

    psd_low_db = 10 * np.log10(psd_low + 1e-30)
    psd_high_db = 10 * np.log10(psd_high + 1e-30)

    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(freqs_low, psd_low_db)
    plt.xlabel("Frequency [Hz]")
    plt.ylabel("PSD [dB V^2/Hz]")
    plt.title("Low band (3–20 kHz, high resolution)")
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(freqs_high, psd_high_db)
    plt.xlabel("Frequency [Hz]")
    plt.ylabel("PSD [dB V^2/Hz]")
    plt.title("High band (20–300 kHz, normal resolution)")
    plt.grid(True)

    plt.tight_layout()
    plt.show()


def plot_band_powers(result: Dict):
    band_powers = result["band_powers"]
    chunk_times = result["chunk_times"]

    plt.figure()
    for name, vals in band_powers.items():
        # RMS = sqrt(power)
        plt.plot(chunk_times, np.sqrt(vals), marker="o", label=name)

    plt.xlabel("Time [s] (chunk index * chunk_duration)")
    plt.ylabel("Band noise RMS [V]")
    plt.title("Band-limited noise vs time (per chunk)")
    plt.grid(True)
    plt.legend()
    plt.show()


# ======================
# 6. 主流程示例
# ======================
if __name__ == "__main__":
    # 1) 生成仿真数据
    t, x = generate_synthetic_signal(FS, DURATION_SEC)
    print("生成仿真数据完成:", x.shape)

    # 2) 随便拿一块数据做多分辨率 PSD(比如中间 0.5s)
    mid_start = int(2.0 * FS)
    mid_end = mid_start + CHUNK_SIZE
    segment = x[mid_start:mid_end]
    multi_res = multi_resolution_psd(segment, FS)
    plot_multi_resolution(multi_res)

    # 3) 在线事件检测(按 0.5s 一块)
    result = detect_events_online(x, FS)
    print("检测到事件数量:", len(result["events"]))
    for ev in result["events"]:
        print(f"  事件: chunk={ev['chunk_idx']}, band={ev['band']}, "
              f"chunk_len={ev['chunk_data'].size}")

    # 4) 画出各频段 RMS 随时间变化
    plot_band_powers(result)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云深之无迹 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 先看一眼规模有多恐怖
  • 频谱分析目标再明确一下
  • 分段 FFT / Welch 方案
    • 分段长度和分辨率
    • 窗函数 & 重叠
  • 为什么要重叠?
  • Welch 的 50% overlap:为了“同样的数据量,更多独立的平均”
  • “保证分块数据”到底是什么意思?
    • 如果不做任何处理,会漏掉跨块的段
    • 所以要留 “carry”(尾巴)跨块拼起来
  • 长时间统计思路:真正需要存什么
    • 全局统计
    • 按照每分钟的 PSD 或“频段功率”时间序列
  • Python 频谱分析流程代码
  • 仿真 + 分段 FFT + 多分辨率 + 事件检测
  • 怎么迁移到“一个月实测数据”上?
    • MCU/FPGA端处理思路
    • 用真实数据流替换仿真数据
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档