前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据分析利器 pandas 系列教程(六):合并上百万个 csv 文件,如何提速上百倍

数据分析利器 pandas 系列教程(六):合并上百万个 csv 文件,如何提速上百倍

作者头像
月小水长
发布2023-11-03 16:15:34
3970
发布2023-11-03 16:15:34
举报
文章被收录于专栏:月小水长月小水长

如开篇初衷,这个系列教程对于基础知识的引导,不求细致而大全,但求细致而实用,

过完基础知识以后就是实战 tricks 的集锦,这些都是笔者在实际工作中用到的解决方案,求小而精,抛砖引玉。

所以后续的更新本来就应该是可遇不可求的,但是我不想以此作为拖更的借口,因为事实上,这大半年我是一直有更新的。

这一年半在我的 BuyiXiao Blog 上更新了差不多 10 篇(标签是 pandas,地址如下),但是几乎都没有发布在公众号上。

https://buyixiao.github.io/tags/pandas/

还是那个原因,代码工程永远是追求最佳实践的,或者更准确的来说应该是更佳实践,因为我觉得脱离了时间背景,没有最佳实践。

所以即使是一个讲解功能点的教程,需要频繁地对一篇教程进行反复修改,不然就是以讹传讹了,公众号只能修改一次太差强人意,所以就都发布在博客上,不定期搬运到公众号上。

所以可以把上面这个链接加入收藏夹吗?

回到今天的正题,加速 pandas 合并 csv ~

在上一篇的教程 数据分析利器 pandas 系列教程(五):合并相同结构的 csv 分享了合并的思路和代码,

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# author:           inspurer(月小水长)
# create_time:      2022/4/13 10:33
# 运行环境           Python3.6+
# github            https://github.com/inspurer
# website           https://buyixiao.github.io/
# 微信公众号         月小水长

import os
import pandas as pd

result_csv = 'all.csv'
all_cols = []
for file in os.listdir('.'):
    if file.endswith('.csv') and not file == result_csv:
        df = pd.read_csv(file)
        all_cols = df.columns.values.tolist()
if len(all_cols) == 0:
    raise Exception("当前目录下没有要合并的 csv 文件")
all_cols.insert(0, 'origin_file_name')
all_df = pd.DataFrame({col: [] for col in all_cols})

for file in os.listdir('.'):
    if file.endswith('.csv') and not file == result_csv:
        df = pd.read_csv(file)
        df.insert(0, 'origin_file_name', [file for _ in range(df.shape[0])])
        all_df = all_df.append(df, ignore_index=True)

all_df.to_csv(result_csv, index=False, encoding='utf-8')

但是最近我遇到一个工程问题,需要合并超过 1000,000 (上百万)个 csv 文件,最大的 10M 左右,最小的 5KB 左右,最开始用的上面这现成的代码,运行了一天之后,我觉得照目前这速度,差不多得合并到元旦

所以探索更佳实践使得我逐行分析了代码耗时,发现大量或者说 99.99% 的耗时集中在下面这行代码上:

代码语言:javascript
复制
all_df = all_df.append(df, ignore_index=True)

pandas 官方已经不推荐使用 append 来连接 dataframe 了,转而使用 concat,即 all_df = pd.concat([all_df,df], ignore_index=True) 但是这不是今天讨论的重点

最开始我为什么要设计成 for 循环中读一个 csv 就合并一次呢,因为我觉得读取全部文件到内存中再合并非常吃内存,设计成这样保存每次只有一个两个 dataframe dfall_df 驻留在内存中。

最开始几百个几千个文件合并的时候这份代码运行没有问题,时间也非常短,但是几十上百万个文件合并时,问题就暴露出来了。

问题在于,append 或者 concat每执行一次,都需要复制一份当前结果dataframe的副本,上百个文件复制尚可,上百万个文件,到后面每复制一次当前已合并的结果 dataframe,耗时可想而知。

找到问题所在,解决办法就很简单了,把 pandas 的连接放到 for 循环外只集中连接一次即可,这就意味着,需要加载完所有的 csv 文件后再连接,改良后合并原来那些上百万个 csv 文件只用不到一个下午测算过耗时减少超过 99%

concat 中有非常多的耗时处理,复制副本仅是比较重要其中一项,这里仅以复制代指这些过程。

定量分析下,假设合并第一个 csv 文件时耗时 1 个时间单位,合并第 N 个 csv 文件时耗时 N 个单位(第一次复制时只合并了 1 个 csv,第 N 次复制时已合并 N 个 csv,假定所有文件大小相同,concat 耗时仅和复制有关,复制仅和文件大小线性相关),那么执行 N 次合并耗时1+2+3+4+...+N-1+N = (N-1)*N/2 个时间单位;如果把连接放在 for 循环外,则只需要第 N 次的耗时 N 个时间单位即可,也就是说,改进后耗时仅是原来的 (N-1)*N/(2*N)=(N-1)/2 分之一,仅和文件总数 N 相关。

按照上面的分析,待合并的 csv 文件夹越多,也就是 N 越大,相比较把连接放在 for 循环,只连接一次的耗时减少得越多(N 很小的时候减少不明显),代码如下:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
# author:           inspurer(月小水长)
# create_time:      2023/10/30 15:23
# 运行环境           Python3.6+
# github            https://github.com/inspurer
# website           https://buyixiao.github.io/
# 微信公众号         月小水长

import pandas as pd

import os


def do_merge(input_folder, output_file='all.csv', append_file_name_col=True, file_name_col='origin_file_name'):
    result_csv = output_file
    all_cols = []
    if not os.path.exists(input_folder):
        raise Exception(f"目录 {input_folder} 不存在")

    file_cnt = len(os.listdir(input_folder))
    for file in os.listdir(input_folder):
        if file.endswith('.csv') and not file == result_csv:
            df = pd.read_csv(os.path.join(input_folder, file))
            all_cols = df.columns.values.tolist()
            break
    if len(all_cols) == 0:
        raise Exception(f"当前目录 {os.path.abspath(input_folder)}下没有要合并的 csv 文件")

    save_cols = all_cols
    if append_file_name_col:
        all_cols.insert(0, file_name_col)
        save_cols.insert(0, file_name_col)

    df_list = []

    for index, file in enumerate(os.listdir(input_folder)):
        print(f'{index + 1}/ {file_cnt} {file}')
        if file.endswith('.csv') and not file == result_csv:
            file_name = file[:file.rindex('.')]
            df = pd.read_csv(os.path.join(input_folder, file), float_precision='high')

            if append_file_name_col:
                df.insert(0, file_name_col, [file_name for _ in range(df.shape[0])])

            df = df[save_cols]

            df_list.append(df)

    all_df = pd.concat(df_list, ignore_index=True)

    print(all_df.shape[0])
    # subset_ = ['unique id colums name of your dataframe']
    subset_ = []
    if append_file_name_col:
        subset_.append(file_name_col)
    all_df.drop_duplicates(subset=subset_, inplace=True, keep='first')
    print(all_df.shape[0])

    all_df.to_csv(result_csv, index=False, encoding='utf-8-sig')


if __name__ == '__main__':
    do_merge(input_folder='./')

但是这是非常吃内存的,假如需要合并的几十万上百万个文件累计有几十 G 大小,即使可能会有虚拟内存加持,还是建议手中持有 32G 或者 64G 内存电脑,方可与之一战

不是很久的以前,我还在学 Java 的时候,听闻江湖中流传着阿里人的 Java 内功心法:为什么阿里巴巴不建议在 for 循环中使用"+"进行字符串拼接

我觉得今天的推送和这个心法有异曲同工之妙,我愿改个标题:为什么BuyiXiao 不建议在 for 循环中使用 append 或者 concat 进行 dataframe 拼接

或者更干脆些:为什么 BuyiXiao 不建议在 for 循环中进行 dataframe 拼接

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 月小水长 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档