首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >问:如何在多线程/多处理的.CSV文件中编写函数输出?(使用字符串数组作为输入)

问:如何在多线程/多处理的.CSV文件中编写函数输出?(使用字符串数组作为输入)
EN

Stack Overflow用户
提问于 2017-05-23 10:17:26
回答 1查看 768关注 0票数 0

我正在编写一个小的网络刮板,在那里我想实现多处理/多线程。

我已经编写了我的函数webScraper(),它接收一个以网站URL作为输入的字符串,抓取一些域数据,并将这些数据逐行写入CSV文件(针对每个域)。

包含所有URL的输入数据保存在字符串数组中,如下所示:urls = ["google.com", "yahoo.com", "bing.com"]。(我考虑从CSV文件更改为URL导入。)

如何使用多处理并将函数输出写入CSV文件,而不会出现不一致和索引越界错误?我找到了一个漂亮的脚本,这似乎正是我需要的。不幸的是,我几天前才从Java转到Python,不知道到底需要修改什么。

所以基本上,我只想更改下面的脚本,以便它为字符串数组urls或输入CSV文件中的每个URL调用我的函数webScraper(url)。然后,脚本应该将每个数组项的函数输出逐行写入我的CSV中(如果我正确理解了代码的话)。

这就是我正在编写的代码(Thanks to hbar for the nice code!)

代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

如果在多进程中不涉及到CSV文件的写入,那么整个事情对我来说都不是问题。我已经尝试了不同的解决方案Python Map Pool (link),但没有成功。我认为池之间存在不一致,这导致了错误。

谢谢你的点子!

EN

回答 1

Stack Overflow用户

发布于 2017-05-25 07:42:26

我处理这个问题的方式是使用多进程进行web抓取,然后使用单个进程写入csv。我敢打赌,抓取是耗时的部分,而I/O是快速的。下面是使用Pool.map多进程处理函数的代码片段。

代码语言:javascript
复制
import multiprocessing as mp
import csv

pool = mp.Pool( processes=mp.cpu_count() ) 
# or however many processors you can support

scraped_data = pool.map( webScraper, urls )

with open('out.csv') as outfile:
    wr = csv.writer(outfile)
    wr.writerow(scraped_data)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44124738

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档