在日常开发中,我们常常需要编写和优化 Python 脚本以处理数据、自动化任务或构建工具。本文记录了我使用 DeepSeek 辅助优化一个实际数据处理脚本的过程,涵盖工具使用、性能分析和并发优化等关键技术点。
最近我需要处理一批传感器采集的时序数据,原始实现是一个简单的 Python 脚本,主要功能包括:
初始版本在处理 10 个文件(约 2GB 数据)时需要 15 分钟,明显存在优化空间。
import pandas as pd
import numpy as np
import os
def process_file(file_path):
"""处理单个文件"""
df = pd.read_csv(file_path)
# 数据清洗
df = df[df['value'] > 0] # 去除负值
df = df.dropna() # 去除空值
# 计算统计量
stats = {
'mean': df['value'].mean(),
'std': df['value'].std(),
'max': df['value'].max(),
'min': df['value'].min()
}
# 输出结果
output_path = f"processed_{os.path.basename(file_path)}"
df.to_csv(output_path, index=False)
return stats
def main():
file_dir = "data/"
all_stats = []
for file_name in os.listdir(file_dir):
if file_name.endswith('.csv'):
file_path = os.path.join(file_dir, file_name)
stats = process_file(file_path)
all_stats.append(stats)
# 保存汇总统计
pd.DataFrame(all_stats).to_csv("summary_stats.csv", index=False)
if __name__ == "__main__":
main()
通过 DeepSeek 分析,我得到了几个关键优化建议:
# DeepSeek 建议的性能分析代码
import cProfile
import pstats
def profile_main():
profiler = cProfile.Profile()
profiler.enable()
main()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumtime').print_stats(10)
# profile_main() # 取消注释运行性能分析
分析结果显示主要瓶颈在:
def process_file_optimized(file_path):
"""优化后的单文件处理"""
# 指定数据类型避免自动推断
dtypes = {
'timestamp': 'str',
'value': 'float32', # 使用 float32 减少内存
'sensor_id': 'int32'
}
# 使用更高效的参数
df = pd.read_csv(
file_path,
dtype=dtypes,
usecols=['timestamp', 'value', 'sensor_id'], # 只读取需要的列
parse_dates=['timestamp'],
engine='c' # 使用C引擎加速
)
# 使用 query() 方法加速过滤
df = df.query('value > 0').dropna()
# 使用更高效的内存操作
stats = {
'mean': df['value'].mean(),
'std': df['value'].std(ddof=0), # 更快的标准差计算
'max': df['value'].max(),
'min': df['value'].min()
}
# 使用更快的输出格式
output_path = f"processed_{os.path.basename(file_path)}"
df.to_parquet(output_path, index=False) # Parquet 比 CSV 更快更小
return stats
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def parallel_process_files():
"""并行处理文件"""
file_dir = "data/"
csv_files = [
os.path.join(file_dir, f)
for f in os.listdir(file_dir)
if f.endswith('.csv')
]
# 根据CPU核心数调整进程数
num_workers = min(multiprocessing.cpu_count(), len(csv_files))
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(process_file_optimized, csv_files))
return results
def process_file_memory_optimized(file_path):
"""内存优化的处理函数"""
# 使用分块读取处理大文件
chunk_size = 100000
chunks = []
for chunk in pd.read_csv(
file_path,
chunksize=chunk_size,
dtype={'value': 'float32'},
usecols=['timestamp', 'value']
):
# 立即过滤减少内存占用
chunk = chunk[chunk['value'] > 0]
chunk = chunk.dropna()
chunks.append(chunk)
# 合并结果
if chunks:
df = pd.concat(chunks, ignore_index=True)
else:
df = pd.DataFrame()
# 后续处理...
return df
通过上述优化,性能得到显著提升:
优化阶段 | 处理时间 | 内存峰值 | 改进措施 |
---|---|---|---|
原始版本 | 15分钟 | 4.2GB | - |
类型优化 | 9分钟 | 2.1GB | 指定数据类型,使用更高效的方法 |
并行处理 | 3分钟 | 2.5GB | 多进程并行处理 |
内存优化 | 2分钟 | 1.3GB | 分块读取处理 |
dtype
可减少内存使用和提高读取速度在实际优化过程中,DeepSeek 提供了几个有价值的功能:
# DeepSeek 建议的高效代码模式示例
# 原始代码
result = []
for item in large_list:
if condition(item):
result.append(transform(item))
# 优化建议:使用生成器表达式
result = (transform(item) for item in large_list if condition(item))
通过这次优化实践,我不仅提升了脚本性能,还深化了对 Python 性能优化技术的理解。DeepSeek 在这样的技术探索过程中发挥了"智能助手"的作用,帮助我快速找到优化方向并验证方案效果。
注意事项:所有优化都应该基于实际性能分析,避免过早优化。建议使用 cProfile
、memory_profiler
等工具准确识别瓶颈后再实施针对性优化。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。