
Nodirc PPK2 是很早就出现的低功耗仪器,甚至还有源表的功能,而且价格也很美丽,我之前还写过关于使用它进行后处理数据的文章。但是有个问题就是,很多时候我想和其它设备联动,比如我给ADC一个转换指令,马上开始进行测量,那么就算转换时的功耗,那这样就方便了。
但是PPK2官方只有一个上位机,好用倒是好用,但是前端我不懂啊!我就阅读了一下源码,发现其实是USB-CDC,说人话就是串口!所以我基于官方的源码设计了这个工具,我测试下来还是很完美的,现在可以自动化控制了;而且短时间官方也不会修改固件什么的,所以不需要担心失效的问题。东西取自开源,作为受益者我也把这个开源了,文档都是按照专业的项目样式写的,希望大家用的开心!
简单来说就是这个设备现在可以脱离官方的控制软件来使用,也可以进行离线测量,都是不在话下的,下次可以给树莓派写点程序把它们丢深山老林里面测试一下(hhhh)。

测试一景
本手册提供了绕过官方nRF Connect PPK应用,直接通过串口控制Nordic Power Profiler Kit II (PPK2) 设备的完整技术方案。基于对PPK2官方项目源码的深入分析,本方案实现了设备的完全自主控制。

好看
参数 | 规格 | 说明 |
|---|---|---|
接口类型 | USB转串口 | PPK2内置USB-to-Serial芯片 |
波特率 | 115200 bps | 固定波特率,不可更改 |
数据位 | 8位 | 标准配置 |
停止位 | 1位 | 标准配置 |
校验位 | 无 | 无奇偶校验 |
流控制 | 无 | 无硬件流控 |
PPK2通过USB连接后在不同操作系统中的设备标识:
COM3, COM4, COM5 等/dev/ttyUSB0, /dev/ttyACM0 等/dev/cu.usbmodem*, /dev/tty.usbmodem* 等应用层 │ PPK命令协议 (本文档定义)
数据链路层 │ 串行数据帧
物理层 │ USB转串口 (115200 bps)
所有命令采用字节数组格式:
在PPK2串口命令协议中,每个命令都有明确的固件支持状态标识。理解这些状态对于正确使用PPK2设备至关重要。
"支持"状态:
"不支持"状态:
对应用开发的影响:
推荐开发策略:
根据实际测试结果,当前PPK2固件版本的命令支持情况:
意外支持的命令(原预期不支持,实际支持):
TRIGGER_INTERVAL_SET (0x04) - 设置触发间隔AVG_NUM_SET (0x02) - 设置平均采样数RANGE_SET (0x08) - 设置测量范围意外不支持的命令(原预期支持,实际不支持):
SWITCH_POINT_UP (0x0F) - 上切换点设置SET_USER_GAINS (0x25) - 设置用户增益RES_USER_SET (0x12) - 用户电阻设置SPIKE_FILTERING_ON (0x15) - 开启尖峰滤波SPIKE_FILTERING_OFF (0x16) - 关闭尖峰滤波预测准确率:61.9%(13/21个命令预测正确)
PPK2的所有串口命令在官方源代码中的定义位置为:
src/constants.tsconst PPKCommands = {
TriggerSet: 0x01, // 设置触发参数
AvgNumSet: 0x02, // 设置平均采样数 (no-firmware)
TriggerWindowSet: 0x03, // 设置触发窗口
TriggerIntervalSet: 0x04, // 设置触发间隔 (no-firmware)
TriggerSingleSet: 0x05, // 单次触发设置
AverageStart: 0x06, // 开始平均采样
AverageStop: 0x07, // 停止平均采样
RangeSet: 0x08, // 设置测量范围 (no-firmware)
LCDSet: 0x09, // LCD显示设置 (no-firmware)
DeviceRunningSet: 0x0c, // 设置设备运行状态
RegulatorSet: 0x0d, // 设置稳压器参数
SwitchPointDown: 0x0e, // 下切换点设置
SwitchPointUp: 0x0f, // 上切换点设置
TriggerExtToggle: 0x11, // 外部触发切换
SetPowerMode: 0x11, // 设置电源模式 (与0x11重复)
ResUserSet: 0x12, // 用户电阻设置
SpikeFilteringOn: 0x15, // 开启尖峰滤波
SpikeFilteringOff: 0x16, // 关闭尖峰滤波
GetMetadata: 0x19, // 获取设备元数据
Reset: 0x20, // 设备复位
SetUserGains: 0x25, // 设置用户增益
};
PPK2源码中的命令实现遵循四层架构:
命令类别 | 源码定义 | 实现方法 | 测试结果 | 一致性 |
|---|---|---|---|---|
有完整实现 | 15个 | 15个 | 13个通过 | 86.7% |
仅有定义 | 7个 | 0个 | 2个意外支持 | - |
标记(no-firmware) | 4个 | 0个 | 3个实际支持 | 25% |
1. 命令码重复问题
TriggerExtToggle 和 SetPowerMode 都使用 0x112. 固件注释不准确
(no-firmware) 的命令中有75%实际支持3. 实现缺失命令
SwitchPointUp、ResUserSet 等有定义但无实现方法SpikeFilteringOn/Off 仅有软件滤波实现,无串口命令命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
TRIGGER_SET | 0x01 | [level] | 设置触发参数 | 支持 | 完整实现 |
TRIGGER_WINDOW_SET | 0x03 | [window] | 设置触发窗口 | 支持 | 完整实现 |
TRIGGER_INTERVAL_SET | 0x04 | [interval] | 设置触发间隔 | 支持 | 标记no-firmware |
TRIGGER_SINGLE_SET | 0x05 | [mode] | 单次触发设置 | 支持 | 完整实现 |
TRIGGER_EXT_TOGGLE | 0x11 | [state] | 外部触发切换 | 支持 | 完整实现 |
命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
AVERAGE_START | 0x06 | 无 | 开始平均采样 | 支持 | 完整实现 |
AVERAGE_STOP | 0x07 | 无 | 停止平均采样 | 支持 | 完整实现 |
AVG_NUM_SET | 0x02 | [count] | 设置平均采样数 | 支持 | 标记no-firmware |
命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
DEVICE_RUNNING_SET | 0x0c | [0/1] | 设置设备运行状态 | 支持 | 完整实现 |
SET_POWER_MODE | 0x11 | [1/2] | 设置电源模式 | 支持 | 完整实现 |
REGULATOR_SET | 0x0d | [H,L] | 设置稳压器参数 | 支持 | 完整实现 |
RESET | 0x20 | 无 | 设备复位 | 支持 | 完整实现 |
命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
RANGE_SET | 0x08 | [range] | 设置测量范围 | 支持 | 标记no-firmware |
SWITCH_POINT_DOWN | 0x0e | [point] | 下切换点设置 | 支持 | 完整实现 |
SWITCH_POINT_UP | 0x0f | [point] | 上切换点设置 | 不支持 | 仅有定义 |
SET_USER_GAINS | 0x25 | [range,gain] | 设置用户增益 | 不支持 | 完整实现 |
RES_USER_SET | 0x12 | [resistance] | 用户电阻设置 | 不支持 | 仅有定义 |
命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
SPIKE_FILTERING_ON | 0x15 | 无 | 开启尖峰滤波 | 不支持 | 软件滤波 |
SPIKE_FILTERING_OFF | 0x16 | 无 | 关闭尖峰滤波 | 不支持 | 软件滤波 |
命令名称 | 十六进制码 | 参数 | 功能描述 | 固件支持 | 源码实现 |
|---|---|---|---|---|---|
GET_METADATA | 0x19 | 无 | 获取设备元数据 | 支持 | 完整实现 |
LCD_SET | 0x09 | [display] | LCD显示设置 | 不支持 | 标记no-firmware |
每个采样点包含6字节数据:
字节 0-3: 电流数据 (32位,小端序)
字节 4-5: 数字通道数据 (16位,小端序)
位域 | 位数 | 描述 |
|---|---|---|
ADC值 | 0-13 (14位) | 原始ADC采样值 |
量程 | 14-16 (3位) | 当前测量量程 (0-4) |
计数器 | 18-23 (6位) | 数据包计数器 |
数字通道 | 24-31 (8位) | 8个数字通道状态 |
量程索引 | 电阻值 (Ω) | 测量范围 |
|---|---|---|
0 | 1031.64 | 最小电流 |
1 | 101.65 | 小电流 |
2 | 10.15 | 中等电流 |
3 | 0.94 | 大电流 |
4 | 0.043 | 最大电流 |
电压值 = ADC值 × (1.8V / 163840)
电流值(μA) = (电压值 / 电阻值) × 1,000,000
import serial
import struct
import time
import threading
from typing import Callable, Optional, Dict, Tuple
class PPK2Controller:
"""PPK2设备串口控制器"""
# PPK命令常量
COMMANDS = {
'TRIGGER_SET': 0x01,
'AVG_NUM_SET': 0x02,
'TRIGGER_WINDOW_SET': 0x03,
'TRIGGER_INTERVAL_SET': 0x04,
'TRIGGER_SINGLE_SET': 0x05,
'AVERAGE_START': 0x06,
'AVERAGE_STOP': 0x07,
'RANGE_SET': 0x08,
'LCD_SET': 0x09,
'DEVICE_RUNNING_SET': 0x0c,
'REGULATOR_SET': 0x0d,
'SWITCH_POINT_DOWN': 0x0e,
'SWITCH_POINT_UP': 0x0f,
'TRIGGER_EXT_TOGGLE': 0x11,
'SET_POWER_MODE': 0x11,
'RES_USER_SET': 0x12,
'SPIKE_FILTERING_ON': 0x15,
'SPIKE_FILTERING_OFF': 0x16,
'GET_METADATA': 0x19,
'RESET': 0x20,
'SET_USER_GAINS': 0x25,
}
def __init__(self, port: str, baudrate: int = 115200):
"""初始化PPK2控制器
Args:
port: 串口设备路径
baudrate: 波特率,默认115200
"""
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
self.data_callback: Optional[Callable] = None
self.running = False
self.read_thread: Optional[threading.Thread] = None
# 设备参数
self.adc_mult = 1.8 / 163840
self.resistors = [1031.64, 101.65, 10.15, 0.94, 0.043]
self.user_gains = [1.0, 1.0, 1.0, 1.0, 1.0]
def connect(self) -> bool:
"""连接PPK2设备
Returns:
bool: 连接是否成功
"""
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity=serial.PARITY_NONE,
stopbits=1,
timeout=1
)
print(f"PPK2设备连接成功: {self.port}")
return True
except Exception as e:
print(f"PPK2设备连接失败: {e}")
return False
def disconnect(self):
"""断开PPK2设备连接"""
self.stop_sampling()
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
print("PPK2设备已断开连接")
def send_command(self, command: list) -> bool:
"""发送命令到PPK2设备
Args:
command: 命令字节列表
Returns:
bool: 命令发送是否成功
"""
if not self.serial_conn or not self.serial_conn.is_open:
print("设备未连接")
return False
try:
cmd_bytes = bytes(command)
self.serial_conn.write(cmd_bytes)
return True
except Exception as e:
print(f"命令发送失败: {e}")
return False
def initialize_device(self) -> Optional[Dict]:
"""初始化PPK2设备
Returns:
Dict: 设备元数据,失败时返回None
"""
print("正在初始化PPK2设备...")
# 1. 获取设备元数据
metadata = self.get_metadata()
if not metadata:
print("获取设备元数据失败")
return None
print(f"设备元数据: {metadata}")
# 2. 设置稳压器电压
vdd = metadata.get('vdd', 3300)
self.set_regulator(vdd)
# 3. 设置用户增益
for i in range(5):
self.set_user_gains(i, self.user_gains[i])
# 4. 设置为安培计模式
self.set_power_mode(ampere_mode=True)
# 5. 启用设备运行
self.set_device_running(True)
print("PPK2设备初始化完成")
return metadata
def get_metadata(self) -> Optional[Dict]:
"""获取PPK2设备元数据
Returns:
Dict: 解析后的元数据字典
"""
if not self.send_command([self.COMMANDS['GET_METADATA']]):
return None
metadata_str = ""
start_time = time.time()
# 等待元数据响应
while time.time() - start_time < 5: # 5秒超时
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
try:
metadata_str += data.decode('utf-8', errors='ignore')
except:
continue
if 'END' in metadata_str:
break
time.sleep(0.01)
return self._parse_metadata(metadata_str)
def _parse_metadata(self, metadata_str: str) -> Dict:
"""解析元数据字符串
Args:
metadata_str: 原始元数据字符串
Returns:
Dict: 解析后的元数据字典
"""
try:
import json
# 清理和格式化元数据字符串
cleaned = metadata_str.replace('END', '').strip().lower()
cleaned = cleaned.replace('-nan', 'null')
cleaned = cleaned.replace('\n', ',\n"')
cleaned = cleaned.replace(': ', '": ')
json_str = '{"' + cleaned + '}'
return json.loads(json_str)
except Exception as e:
print(f"元数据解析失败: {e}")
return {}
def set_power_mode(self, ampere_mode: bool = True):
"""设置电源模式
Args:
ampere_mode: True=安培计模式, False=源表模式
"""
mode = 1 if ampere_mode else 2
self.send_command([self.COMMANDS['SET_POWER_MODE'], mode])
print(f"电源模式设置为: {'安培计' if ampere_mode else '源表'}")
def set_regulator(self, voltage_mv: int):
"""设置稳压器电压
Args:
voltage_mv: 电压值(毫伏)
"""
high_byte = (voltage_mv >> 8) & 0xFF
low_byte = voltage_mv & 0xFF
self.send_command([self.COMMANDS['REGULATOR_SET'], high_byte, low_byte])
print(f"稳压器电压设置为: {voltage_mv} mV")
def set_user_gains(self, range_index: int, gain: float):
"""设置用户增益
Args:
range_index: 量程索引 (0-4)
gain: 增益值
"""
if 0 <= range_index < 5:
self.user_gains[range_index] = gain
gain_bytes = list(struct.pack('<f', gain))
command = [self.COMMANDS['SET_USER_GAINS'], range_index] + gain_bytes
self.send_command(command)
print(f"量程{range_index}增益设置为: {gain}")
def set_device_running(self, running: bool):
"""设置设备运行状态
Args:
running: True=运行, False=停止
"""
state = 1 if running else 0
self.send_command([self.COMMANDS['DEVICE_RUNNING_SET'], state])
print(f"设备运行状态: {'运行' if running else '停止'}")
def start_sampling(self):
"""开始数据采样"""
if not self.send_command([self.COMMANDS['AVERAGE_START']]):
return False
self.running = True
# 启动数据读取线程
self.read_thread = threading.Thread(target=self._read_data_loop, daemon=True)
self.read_thread.start()
print("开始数据采样")
return True
def stop_sampling(self):
"""停止数据采样"""
self.running = False
self.send_command([self.COMMANDS['AVERAGE_STOP']])
if self.read_thread:
self.read_thread.join(timeout=1)
print("停止数据采样")
def _read_data_loop(self):
"""数据读取循环(在独立线程中运行)"""
buffer = b''
while self.running and self.serial_conn and self.serial_conn.is_open:
try:
if self.serial_conn.in_waiting > 0:
data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer += data
# 处理完整的数据包(每个采样点6字节)
while len(buffer) >= 6:
sample_data = buffer[:6]
buffer = buffer[6:]
# 解析采样数据
current, digital_channels = self._parse_sample_data(sample_data)
# 调用数据回调函数
if self.data_callback:
self.data_callback(current, digital_channels)
time.sleep(0.001) # 1ms延迟
except Exception as e:
print(f"数据读取错误: {e}")
break
def _parse_sample_data(self, data: bytes) -> Tuple[float, int]:
"""解析6字节采样数据
Args:
data: 6字节原始数据
Returns:
Tuple[float, int]: (电流值(μA), 数字通道状态)
"""
if len(data) != 6:
return 0.0, 0
try:
# 解析32位数据
raw_value = struct.unpack('<I', data[:4])[0]
digital_data = struct.unpack('<H', data[4:6])[0]
# 提取各个字段
adc_value = raw_value & 0x3FFF # 14位ADC值
range_value = (raw_value >> 14) & 0x7 # 3位量程
counter = (raw_value >> 18) & 0x3F # 6位计数器
digital_channels = (raw_value >> 24) & 0xFF # 8位数字通道
# 转换为实际电流值
current_ua = self._convert_to_current(adc_value, range_value)
return current_ua, digital_channels
except Exception as e:
print(f"数据解析错误: {e}")
return 0.0, 0
def _convert_to_current(self, adc_value: int, range_idx: int) -> float:
"""将ADC值转换为电流值(微安)
Args:
adc_value: ADC采样值
range_idx: 量程索引
Returns:
float: 电流值(微安)
"""
if range_idx >= len(self.resistors):
return 0.0
try:
voltage = adc_value * self.adc_mult
resistance = self.resistors[range_idx]
gain = self.user_gains[range_idx]
# 计算电流值(微安)
current_ua = (voltage / resistance) * gain * 1000000
return current_ua
except Exception:
return 0.0
def set_data_callback(self, callback: Callable[[float, int], None]):
"""设置数据回调函数
Args:
callback: 回调函数,参数为(电流值, 数字通道状态)
"""
self.data_callback = callback
class PPK2Trigger:
"""PPK2触发器控制类"""
def __init__(self, ppk2_controller: PPK2Controller):
self.ppk2 = ppk2_controller
self.analog_trigger_level = 0.0
self.analog_trigger_rising = True
self.analog_trigger_active = False
self.digital_trigger_mask = 0x00
self.digital_trigger_active = False
self.trigger_callback = None
def set_analog_trigger(self, level_ua: float, rising_edge: bool = True):
"""设置模拟触发器
Args:
level_ua: 触发电流阈值(微安)
rising_edge: True=上升沿触发, False=下降沿触发
"""
self.analog_trigger_level = level_ua
self.analog_trigger_rising = rising_edge
self.analog_trigger_active = True
print(f"模拟触发器设置: {level_ua}μA, {'上升沿' if rising_edge else '下降沿'}")
def set_digital_trigger(self, channel_mask: int):
"""设置数字触发器
Args:
channel_mask: 8位通道掩码,1表示高电平触发
"""
self.digital_trigger_mask = channel_mask & 0xFF
self.digital_trigger_active = True
print(f"数字触发器设置: 0b{channel_mask:08b}")
def check_trigger(self, current_ua: float, digital_channels: int) -> bool:
"""检查触发条件
Args:
current_ua: 当前电流值
digital_channels: 数字通道状态
Returns:
bool: 是否触发
"""
triggered = False
# 检查模拟触发
if self.analog_trigger_active:
if self.analog_trigger_rising:
triggered = current_ua > self.analog_trigger_level
else:
triggered = current_ua < self.analog_trigger_level
# 检查数字触发
if self.digital_trigger_active:
triggered = triggered or (digital_channels & self.digital_trigger_mask) != 0
if triggered and self.trigger_callback:
self.trigger_callback(current_ua, digital_channels)
return triggered
def set_trigger_callback(self, callback: Callable[[float, int], None]):
"""设置触发回调函数"""
self.trigger_callback = callback
import csv
from datetime import datetime
class PPK2DataLogger:
"""PPK2数据记录器"""
def __init__(self, filename: str = None):
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"ppk2_data_{timestamp}.csv"
self.filename = filename
self.file = None
self.writer = None
self.sample_count = 0
self.start_time = None
def start_logging(self):
"""开始数据记录"""
try:
self.file = open(self.filename, 'w', newline='', encoding='utf-8')
self.writer = csv.writer(self.file)
# 写入CSV头部
self.writer.writerow([
'timestamp', 'elapsed_ms', 'current_ua',
'digital_channels', 'channel_0', 'channel_1',
'channel_2', 'channel_3', 'channel_4',
'channel_5', 'channel_6', 'channel_7'
])
self.start_time = time.time()
self.sample_count = 0
print(f"开始数据记录: {self.filename}")
return True
except Exception as e:
print(f"数据记录启动失败: {e}")
return False
def log_sample(self, current_ua: float, digital_channels: int):
"""记录采样数据
Args:
current_ua: 电流值(微安)
digital_channels: 数字通道状态
"""
if not self.writer:
return
try:
current_time = time.time()
elapsed_ms = (current_time - self.start_time) * 1000
# 解析数字通道
channels = [(digital_channels >> i) & 1 for i in range(8)]
# 写入数据行
self.writer.writerow([
current_time, elapsed_ms, current_ua,
f"0b{digital_channels:08b}"
] + channels)
self.sample_count += 1
except Exception as e:
print(f"数据记录错误: {e}")
def stop_logging(self):
"""停止数据记录"""
if self.file:
self.file.close()
duration = time.time() - self.start_time if self.start_time else 0
print(f"数据记录完成: {self.sample_count} 个采样点, 持续时间: {duration:.2f}秒")
print(f"文件保存为: {self.filename}")
def basic_usage_example():
"""基础使用示例"""
def data_handler(current_ua, digital_channels):
"""数据处理回调函数"""
print(f"电流: {current_ua:8.2f} μA, 数字通道: {digital_channels:08b}")
# 1. 创建PPK2控制器
ppk2 = PPK2Controller("COM3") # 根据实际端口修改
try:
# 2. 连接设备
if not ppk2.connect():
return
# 3. 初始化设备
metadata = ppk2.initialize_device()
if not metadata:
return
# 4. 设置数据回调
ppk2.set_data_callback(data_handler)
# 5. 开始采样
if ppk2.start_sampling():
print("采样进行中,按Ctrl+C停止...")
# 6. 运行采样
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n用户中断采样")
finally:
# 7. 清理资源
ppk2.stop_sampling()
ppk2.disconnect()
if __name__ == "__main__":
basic_usage_example()
def advanced_usage_example():
"""高级功能使用示例"""
# 创建控制器和功能模块
ppk2 = PPK2Controller("COM3")
trigger = PPK2Trigger(ppk2)
logger = PPK2DataLogger("advanced_measurement.csv")
# 统计变量
sample_count = 0
trigger_count = 0
def data_handler(current_ua, digital_channels):
"""数据处理函数"""
nonlocal sample_count
sample_count += 1
# 记录数据
logger.log_sample(current_ua, digital_channels)
# 检查触发条件
if trigger.check_trigger(current_ua, digital_channels):
nonlocal trigger_count
trigger_count += 1
# 每1000个采样点显示一次统计
if sample_count % 1000 == 0:
print(f"采样: {sample_count}, 触发: {trigger_count}, "
f"当前电流: {current_ua:.2f}μA")
def trigger_handler(current_ua, digital_channels):
"""触发事件处理函数"""
print(f"*** 触发事件 *** 电流: {current_ua:.2f}μA, "
f"数字通道: {digital_channels:08b}")
try:
# 连接和初始化
if not ppk2.connect():
return
metadata = ppk2.initialize_device()
if not metadata:
return
# 配置触发器
trigger.set_analog_trigger(1000.0, rising_edge=True) # 1mA上升沿触发
trigger.set_digital_trigger(0b00000001) # 通道0高电平触发
trigger.set_trigger_callback(trigger_handler)
# 设置回调和开始记录
ppk2.set_data_callback(data_handler)
logger.start_logging()
# 开始采样
if ppk2.start_sampling():
print("高级测量开始,按Ctrl+C停止...")
print("触发条件: 电流>1000μA 或 数字通道0=高")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n测量结束")
finally:
# 清理资源
ppk2.stop_sampling()
logger.stop_logging()
ppk2.disconnect()
print(f"\n测量统计:")
print(f"总采样数: {sample_count}")
print(f"触发次数: {trigger_count}")
print(f"触发率: {trigger_count/sample_count*100:.2f}%" if sample_count > 0 else "N/A")
if __name__ == "__main__":
advanced_usage_example()
症状: PPK2设备连接失败: [Errno 2] could not open port
解决方案:
# 串口设备检测代码
import serial.tools.list_ports
def list_serial_ports():
"""列出所有可用串口"""
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"端口: {port.device}, 描述: {port.description}")
list_serial_ports()
症状: 获取设备元数据失败
解决方案:
def reset_device(self):
"""复位PPK2设备"""
self.send_command([self.COMMANDS['RESET']])
time.sleep(2) # 等待设备复位完成
症状: 数据解析错误: struct.error
解决方案:
class OptimizedPPK2Controller(PPK2Controller):
"""优化版PPK2控制器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.buffer_size = 1024 # 增大缓冲区
self.batch_size = 100 # 批量处理大小
def _read_data_loop(self):
"""优化的数据读取循环"""
buffer = b''
sample_batch = []
while self.running and self.serial_conn and self.serial_conn.is_open:
try:
# 批量读取数据
if self.serial_conn.in_waiting >= self.buffer_size:
data = self.serial_conn.read(self.buffer_size)
buffer += data
# 批量处理采样点
while len(buffer) >= 6 * self.batch_size:
for i in range(self.batch_size):
sample_data = buffer[i*6:(i+1)*6]
current, digital = self._parse_sample_data(sample_data)
sample_batch.append((current, digital))
buffer = buffer[6 * self.batch_size:]
# 批量回调
if self.data_callback:
for current, digital in sample_batch:
self.data_callback(current, digital)
sample_batch.clear()
time.sleep(0.001)
except Exception as e:
print(f"数据读取错误: {e}")
break
class MemoryEfficientLogger:
"""内存高效的数据记录器"""
def __init__(self, filename: str, buffer_size: int = 10000):
self.filename = filename
self.buffer_size = buffer_size
self.buffer = []
self.file = None
self.writer = None
def log_sample(self, current_ua: float, digital_channels: int):
"""缓冲式数据记录"""
self.buffer.append((time.time(), current_ua, digital_channels))
# 缓冲区满时批量写入
if len(self.buffer) >= self.buffer_size:
self._flush_buffer()
def _flush_buffer(self):
"""刷新缓冲区到文件"""
if self.writer and self.buffer:
for timestamp, current, digital in self.buffer:
self.writer.writerow([timestamp, current, digital])
self.buffer.clear()
self.file.flush() # 强制写入磁盘
项目 | 规格 |
|---|---|
通信接口 | USB 2.0 (CDC-ACM) |
波特率 | 115200 bps |
采样率 | 最高 1 MHz |
电流测量范围 | 0.2 μA - 70 mA |
电压输出范围 | 0.8V - 5.0V |
数字通道 | 8个输入通道 |
测量精度 | ±(0.2% + 200 nA) |
项目 | 要求 |
|---|---|
Python版本 | ≥ 3.7 |
必需库 | pyserial ≥ 3.4 |
可选库 | numpy, matplotlib, pandas |
操作系统 | Windows 7+, Linux, macOS |
内存需求 | ≥ 100 MB |
存储空间 | ≥ 1 GB (用于数据记录) |
指标 | 典型值 | 最大值 |
|---|---|---|
数据吞吐量 | 6 MB/s | 10 MB/s |
延迟 | < 1 ms | < 5 ms |
CPU使用率 | < 5% | < 15% |
内存使用 | 50 MB | 200 MB |
数据丢失率 | < 0.01% | < 0.1% |
# 快速命令参考
COMMANDS = {
# 基础控制
'START_SAMPLING': [0x06],
'STOP_SAMPLING': [0x07],
'DEVICE_ON': [0x0c, 1],
'DEVICE_OFF': [0x0c, 0],
'RESET': [0x20],
# 电源模式
'AMPERE_MODE': [0x11, 1],
'SOURCE_MODE': [0x11, 2],
# 电压设置 (3.3V)
'SET_3V3': [0x0d, 0x0c, 0xe4],
'SET_5V0': [0x0d, 0x13, 0x88],
# 滤波控制
'FILTER_ON': [0x15],
'FILTER_OFF': [0x16],
}
错误代码 | 描述 | 解决方案 |
|---|---|---|
E001 | 串口连接失败 | 检查设备连接和驱动 |
E002 | 命令发送失败 | 检查串口状态 |
E003 | 元数据解析失败 | 检查固件版本 |
E004 | 数据包格式错误 | 重新同步数据流 |
E005 | 缓冲区溢出 | 增加处理速度或缓冲区大小 |
PPK2固件版本 | 支持的命令 | 兼容性 |
|---|---|---|
1.2.4+ | 全部命令 | 完全兼容 |
1.2.0-1.2.3 | 部分命令 | 部分兼容 |
< 1.2.0 | 基础命令 | 不推荐 |
以下是一个完整的PPK2最小连接演示程序,展示了如何使用pySerial库直接控制PPK2设备:
文件名: ppk2_minimal_demo.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
PPK2 最小连接演示
这是一个最简单的PPK2串口连接示例,演示如何:
1. 连接PPK2设备
2. 初始化设备
3. 开始数据采集
4. 读取和显示数据
使用方法:
1. 确保PPK2设备已连接到电脑
2. 修改SERIAL_PORT为实际的串口设备路径
3. 运行脚本:python ppk2_minimal_demo.py
"""
import serial
import struct
import time
import threading
from typing import Optional
# ============================================================================
# 配置参数
# ============================================================================
# 串口配置
SERIAL_PORT = "COM4" # Windows示例,Linux使用 "/dev/ttyUSB0" 或 "/dev/ttyACM0"
BAUD_RATE = 115200
TIMEOUT = 1.0
# PPK2命令常量
PPK_COMMANDS = {
'GET_METADATA': 0x19,
'AVERAGE_START': 0x06,
'AVERAGE_STOP': 0x07,
'DEVICE_RUNNING_SET': 0x0c,
'SET_POWER_MODE': 0x11,
'REGULATOR_SET': 0x0d,
'RESET': 0x20,
}
# 数据格式常量
FRAME_SIZE = 6 # 每个数据点6字节(4字节电流 + 2字节数字通道)
# ============================================================================
# PPK2最小控制类
# ============================================================================
class PPK2MinimalController:
"""PPK2最小控制器
提供最基本的PPK2连接和数据读取功能
"""
def __init__(self, port: str, baudrate: int = BAUD_RATE):
"""初始化控制器
Args:
port: 串口设备路径
baudrate: 波特率
"""
self.port = port
self.baudrate = baudrate
self.serial_conn: Optional[serial.Serial] = None
self.running = False
self.data_count = 0
def connect(self) -> bool:
"""连接PPK2设备
Returns:
bool: 连接是否成功
"""
try:
print(f"正在连接PPK2设备: {self.port}")
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity=serial.PARITY_NONE,
stopbits=1,
timeout=TIMEOUT
)
print(f"✓ PPK2设备连接成功")
return True
except Exception as e:
print(f"✗ PPK2设备连接失败: {e}")
return False
def disconnect(self):
"""断开PPK2设备连接"""
self.stop_sampling()
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
print("PPK2设备已断开连接")
def send_command(self, command: list) -> bool:
"""发送命令到PPK2
Args:
command: 命令字节列表
Returns:
bool: 命令发送是否成功
"""
if not self.serial_conn or not self.serial_conn.is_open:
print("设备未连接")
return False
try:
cmd_bytes = bytes(command)
self.serial_conn.write(cmd_bytes)
return True
except Exception as e:
print(f"命令发送失败: {e}")
return False
def get_metadata(self) -> dict:
"""获取PPK2设备元数据
Returns:
dict: 设备元数据
"""
print("正在获取设备元数据...")
if not self.send_command([PPK_COMMANDS['GET_METADATA']]):
return {}
metadata_str = ""
start_time = time.time()
# 等待元数据响应(最多5秒)
while time.time() - start_time < 5:
if self.serial_conn.in_waiting > 0:
try:
data = self.serial_conn.read(self.serial_conn.in_waiting)
metadata_str += data.decode('utf-8', errors='ignore')
if 'END' in metadata_str:
break
except Exception as e:
print(f"读取元数据时出错: {e}")
break
time.sleep(0.01)
if 'END' in metadata_str:
print("✓ 设备元数据获取成功")
# 简单解析元数据(实际项目中需要更完善的解析)
lines = metadata_str.replace('END', '').strip().split('\n')
metadata = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
metadata[key.strip().lower()] = value.strip()
return metadata
else:
print("✗ 设备元数据获取失败")
return {}
def initialize_device(self) -> bool:
"""初始化PPK2设备
Returns:
bool: 初始化是否成功
"""
print("正在初始化PPK2设备...")
# 1. 获取设备元数据
metadata = self.get_metadata()
if not metadata:
print("✗ 设备初始化失败:无法获取元数据")
return False
print(f"设备信息: {metadata.get('hw', 'Unknown')}")
# 2. 设置为安培计模式(模式1)
if not self.send_command([PPK_COMMANDS['SET_POWER_MODE'], 1]):
print("✗ 设置电源模式失败")
return False
# 3. 启用设备运行
if not self.send_command([PPK_COMMANDS['DEVICE_RUNNING_SET'], 1]):
print("✗ 启用设备运行失败")
return False
print("✓ PPK2设备初始化完成")
return True
def start_sampling(self) -> bool:
"""开始数据采样
Returns:
bool: 采样启动是否成功
"""
if not self.send_command([PPK_COMMANDS['AVERAGE_START']]):
print("✗ 启动采样失败")
return False
self.running = True
self.data_count = 0
print("✓ 数据采样已开始")
return True
def stop_sampling(self):
"""停止数据采样"""
self.running = False
if self.serial_conn and self.serial_conn.is_open:
self.send_command([PPK_COMMANDS['AVERAGE_STOP']])
print("数据采样已停止")
def parse_sample_data(self, data: bytes) -> tuple:
"""解析6字节采样数据
Args:
data: 6字节原始数据
Returns:
tuple: (电流值(μA), 数字通道状态)
"""
if len(data) != FRAME_SIZE:
return 0.0, 0
try:
# 解析32位数据(小端序)
raw_value = struct.unpack('<I', data[:4])[0]
digital_data = struct.unpack('<H', data[4:6])[0]
# 提取ADC值(14位)
adc_value = raw_value & 0x3FFF
# 简化的电流转换(实际需要校准参数)
# 这里使用一个近似的转换公式
voltage = adc_value * (1.8 / 16384) # ADC转电压
current_ua = voltage * 1000 # 简化的电流计算
return current_ua, digital_data
except Exception as e:
print(f"数据解析错误: {e}")
return 0.0, 0
def read_data_loop(self, duration: float = 10.0):
"""数据读取循环
Args:
duration: 读取持续时间(秒)
"""
print(f"开始读取数据,持续时间: {duration}秒")
print("数据格式: [时间] 电流(μA) 数字通道")
print("-" * 50)
buffer = b''
start_time = time.time()
last_display_time = start_time
while self.running and (time.time() - start_time) < duration:
try:
if self.serial_conn.in_waiting > 0:
# 读取可用数据
new_data = self.serial_conn.read(self.serial_conn.in_waiting)
buffer += new_data
# 处理完整的6字节数据包
while len(buffer) >= FRAME_SIZE:
sample_data = buffer[:FRAME_SIZE]
buffer = buffer[FRAME_SIZE:]
# 解析数据
current_ua, digital_channels = self.parse_sample_data(sample_data)
self.data_count += 1
# 每秒显示一次数据(避免输出过多)
current_time = time.time()
if current_time - last_display_time >= 1.0:
elapsed = current_time - start_time
print(f"[{elapsed:6.1f}s] {current_ua:8.2f} μA 0b{digital_channels:08b}")
last_display_time = current_time
time.sleep(0.001) # 1ms延迟
except Exception as e:
print(f"数据读取错误: {e}")
break
print("-" * 50)
print(f"数据读取完成,共处理 {self.data_count} 个采样点")
# ============================================================================
# 主程序
# ============================================================================
def main():
"""主程序入口"""
print("PPK2 最小连接演示")
print("=" * 50)
# 首先检查可用串口
print("\n检查可用串口设备:")
list_serial_ports()
print()
# 尝试自动检测PPK2设备
detected_port = auto_detect_ppk2()
if detected_port:
print(f"自动检测到PPK2设备: {detected_port}")
port_to_use = detected_port
else:
print(f"未自动检测到PPK2设备,使用默认端口: {SERIAL_PORT}")
port_to_use = SERIAL_PORT
# 创建PPK2控制器
ppk2 = PPK2MinimalController(port_to_use)
try:
# 1. 连接设备
if not ppk2.connect():
print("连接失败,请检查:")
print("1. PPK2设备是否已连接到电脑")
print("2. 串口路径是否正确")
print("3. 设备驱动是否已安装")
print("\n可用的串口设备:")
list_serial_ports()
return
# 2. 初始化设备
if not ppk2.initialize_device():
print("设备初始化失败")
return
# 3. 开始采样
if not ppk2.start_sampling():
print("启动采样失败")
return
# 4. 读取数据(10秒)
try:
ppk2.read_data_loop(duration=10.0)
except KeyboardInterrupt:
print("\n用户中断采样")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
# 5. 清理资源
ppk2.disconnect()
print("程序结束")
def list_serial_ports():
"""列出可用的串口设备"""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
if not ports:
print("未找到任何串口设备")
return []
for i, port in enumerate(ports, 1):
print(f"{i}. {port.device} - {port.description}")
if 'PPK' in port.description.upper() or 'NORDIC' in port.description.upper():
print(f" *** 可能是PPK2设备 ***")
return ports
except ImportError:
print("需要安装pyserial库: pip install pyserial")
return []
except Exception as e:
print(f"列出串口设备时出错: {e}")
return []
def auto_detect_ppk2():
"""自动检测PPK2设备
Returns:
str: PPK2设备的串口路径,如果未找到则返回None
"""
try:
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
# 查找可能的PPK2设备
for port in ports:
description = port.description.upper()
# 检查描述中是否包含PPK或Nordic相关关键词
if any(keyword in description for keyword in ['PPK', 'NORDIC', 'SEGGER', 'J-LINK']):
return port.device
return None
except Exception as e:
print(f"自动检测PPK2设备时出错: {e}")
return None
if __name__ == "__main__":
main()
核心功能:
技术亮点:
以下是程序的实际运行日志,展示了完整的PPK2连接和数据采集过程:
PPK2 最小连接演示
==================================================
检查可用串口设备:
1. COM3 - USB 串行设备 (COM3)
2. COM4 - nRF Connect USB CDC ACM (COM4)
未自动检测到PPK2设备,使用默认端口: COM4
正在连接PPK2设备: COM4
✓ PPK2设备连接成功
正在初始化PPK2设备...
正在获取设备元数据...
✓ 设备元数据获取成功
设备信息: 61320
✓ PPK2设备初始化完成
✓ 数据采样已开始
开始读取数据,持续时间: 10.0秒
数据格式: [时间] 电流(μA) 数字通道
--------------------------------------------------
[ 1.0s] 3.19 μA 0b00011110
[ 2.0s] 27.69 μA 0b00000000
[ 3.0s] 27.69 μA 0b00000000
[ 4.0s] 3.19 μA 0b00011110
[ 5.0s] 27.69 μA 0b00000000
[ 6.0s] 27.69 μA 0b00000000
[ 7.1s] 3.52 μA 0b00011001
[ 8.1s] 19.78 μA 0b10111000
[ 9.1s] 3.08 μA 0b00011110
--------------------------------------------------
数据读取完成,共处理 664917 个采样点
数据采样已停止
PPK2设备已断开连接
程序结束
1. 设备检测阶段
检查可用串口设备:
1. COM3 - USB 串行设备 (COM3)
2. COM4 - nRF Connect USB CDC ACM (COM4)
2. 连接和初始化阶段
正在连接PPK2设备: COM4
✓ PPK2设备连接成功
正在初始化PPK2设备...
正在获取设备元数据...
✓ 设备元数据获取成功
设备信息: 61320
✓ PPK2设备初始化完成
3. 数据采集阶段
✓ 数据采样已开始
开始读取数据,持续时间: 10.0秒
数据格式: [时间] 电流(μA) 数字通道
--------------------------------------------------
[ 1.0s] 3.19 μA 0b00011110
[ 2.0s] 27.69 μA 0b00000000
[ 3.0s] 27.69 μA 0b00000000
4. 性能统计
数据读取完成,共处理 664917 个采样点
电流值变化分析:
数字通道状态分析:
0b00011110:通道1,2,3,4为高电平0b00000000:所有通道为低电平0b10111000:通道3,4,5,7为高电平# 安装Python依赖
pip install pyserial>=3.4
# 可选:安装额外的分析工具
pip install numpy matplotlib pandas
连接硬件
PPK2设备 ──USB线──> 电脑USB端口
安装依赖
pip install pyserial
运行程序
python ppk2_minimal_demo.py
查看结果
基于这个最小演示,可以轻松扩展出更多功能:
# 添加CSV数据记录功能
import csv
from datetime import datetime
class DataLogger:
def __init__(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = f"ppk2_data_{timestamp}.csv"
self.file = open(self.filename, 'w', newline='')
self.writer = csv.writer(self.file)
self.writer.writerow(['timestamp', 'current_ua', 'digital_channels'])
def log_data(self, current_ua, digital_channels):
self.writer.writerow([time.time(), current_ua, digital_channels])
# 添加matplotlib实时图表
import matplotlib.pyplot as plt
from collections import deque
class RealTimePlotter:
def __init__(self, max_points=1000):
self.times = deque(maxlen=max_points)
self.currents = deque(maxlen=max_points)
plt.ion()
self.fig, self.ax = plt.subplots()
self.line, = self.ax.plot([], [])
def update_plot(self, current_ua):
self.times.append(time.time())
self.currents.append(current_ua)
self.line.set_data(self.times, self.currents)
self.ax.relim()
self.ax.autoscale_view()
plt.pause(0.01)
# 添加电流阈值触发功能
class CurrentTrigger:
def __init__(self, threshold_ua=10.0):
self.threshold = threshold_ua
self.triggered = False
def check_trigger(self, current_ua):
if current_ua > self.threshold and not self.triggered:
print(f"触发!电流超过阈值: {current_ua:.2f} μA > {self.threshold} μA")
self.triggered = True
return True
elif current_ua <= self.threshold:
self.triggered = False
return False
本技术手册提供了完整的PPK2串口控制解决方案,涵盖了从基础连接到高级功能的所有方面。通过本方案,开发者可以:
通过最小演示程序的实际运行,我们验证了:
建议在实际应用中根据具体需求对代码进行优化和扩展。如有技术问题,请参考故障排除章节或联系技术支持。
技术负责人: YUNSWJ专业领域: PPK2设备控制、串口通信协议、嵌入式系统功耗分析技术支持: 提供PPK2串口控制方案的技术咨询和问题解答
GitHub Issues: 请通过项目GitHub页面提交技术问题技术文档: 本手册提供完整的技术实现方案代码示例: 包含完整的Python实现和使用示例
如遇到以下问题,请及时反馈: PPK2设备连接和通信问题 串口命令协议相关疑问 数据解析和处理技术难题 性能优化和故障排除需求
文档版本: 0.9 最后更新: 2025年 技术联系人: YUNSWJ 技术支持: 请通过GitHub Issues提交问题 实际验证: 已通过PPK2设备实际测试验证