前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用多进程库计算科学数据时出现内存错误

使用多进程库计算科学数据时出现内存错误

原创
作者头像
华科云商小徐
发布2024-01-22 09:54:17
1120
发布2024-01-22 09:54:17
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫

问题背景

我经常使用爬虫来做数据抓取,多线程爬虫方案是必不可少的,正如我在使用 Python 进行科学计算时,需要处理大量存储在 CSV 文件中的数据。由于每个处理过程需要很长时间才能完成,而您拥有多核处理器,所以您尝试使用多进程库中的 Pool 方法来提高计算效率。

您按照如下方式构建了多进程调用:

代码语言:javascript
复制
pool = Pool()
vector_components = []
for sample0 in range(samples):
    vector_field_x_i = vector_field_samples_x[sample]
    vector_field_y_i = vector_field_samples_y[sample]
    vector_component = pool.apply_async(vector_field_decomposer, args=(x_dim, y_dim, x_steps, y_steps,
                                                                       vector_field_x_i, vector_field_y_i))
    vector_components.append(vector_component)
pool.close()
pool.join()
​
vector_components = map(lambda k: k.get(), vector_components)
​
for vector_component in vector_components:
    CsvH.write_vector_field(vector_component, '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')

使用此代码,当您处理 500 个元素,每个元素大小为 100 x 100 的数据时,一切正常。但是,当您尝试处理 500 个元素,每个元素大小为 400 x 400 时,在调用 get() 时会收到内存错误。

解决方案

出现内存错误的原因是您的代码在内存中保留了多个列表,包括 vector_field_x、vector_field_y、vector_components,以及在 map() 调用期间创建的 vector_components 的副本。当您尝试处理较大的数据时,这些列表可能变得非常大,从而导致内存不足。

为了解决此问题,您需要避免在内存中保存完整的列表。您可以使用多进程库中的 imap() 方法来实现这一点。imap() 方法返回一个迭代器而不是完整的列表,因此您不必将所有结果都保存在内存中。

以下是使用 imap() 方法重写代码的示例:

代码语言:javascript
复制
import multiprocessing
from functools import partial
​
def vector_field_decomposer(x_dim, y_dim, x_steps, y_steps, vector_fields):
    vector_field_x_i = vector_fields[0]
    vector_field_y_i = vector_fields[1]
    # Do whatever is normally done here.
​
if __name__ == "__main__":
    num_workers = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_workers)
    # Calculate a good chunksize (based on implementation of pool.map)
    chunksize, extra = divmod(samples // 4 * num_workers)
    if extra:
        chunksize += 1
​
    # Use partial so many arguments can be passed to vector_field_decomposer
    func = partial(vector_field_decomposer, x_dim, y_dim, x_steps, y_steps)
    # We use a generator expression as an iterable, so we don't create a full list.
    results = pool.imap(func, 
                        ((vector_field_samples_x[s], vector_field_samples_y[s]) for s in xrange(samples)),
                        chunksize=chunksize)
    for vector in results:
        CsvH.write_vector_field(vector_component, 
                                '../CSV/RotationalFree/rotational_free_x_'+str(sample)+'.csv')
    pool.close()
    pool.join()

通过使用这种方法,您可以避免出现内存错误,并能够处理较大的数据。

请确保你的计算任务是可以并行化的,并且注意到在Windows系统上,mclapply可能不如在Unix-like系统(如Linux或Mac OS X)上有效。在Windows系统上,你可能需要使用parLapply函数来代替。如果有更多专业知识不懂得可以评论区一起讨论。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档