首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用数据流读取CSV文件,但在将行吞并到Google BigQuery之前增加两个列op_type和op_time

使用数据流读取CSV文件,但在将行吞并到Google BigQuery之前增加两个列op_type和op_time
EN

Stack Overflow用户
提问于 2022-02-03 08:46:43
回答 2查看 310关注 0票数 0

我有一个数据流代码,它从gs://中的存储桶读取CSV文件,并将该CSV文件摄取到BigQuery表中。BigQuery表已经创建。下面的代码工作正常。

代码语言:javascript
复制
class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'DUMMY',
    dataset='test',
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()

但是,我需要为CSV文件中的每一行添加两个额外的列,即op_type和op_time。下面是BigQuery表定义中的内容。

代码语言:javascript
复制
Field name
Type
Mode
Policy tags
Description
ID  FLOAT   REQUIRED    
CLUSTERED   FLOAT   NULLABLE    
SCATTERED   FLOAT   NULLABLE    
RANDOMISED  FLOAT   NULLABLE    
RANDOM_STRING   STRING  NULLABLE    
SMALL_VC    STRING  NULLABLE    
PADDING STRING  NULLABLE    
op_type INTEGER REQUIRED    
op_time TIMESTAMP   REQUIRED    

在PySpark中,我可以通过向dataframe添加两列来实现这一点,如下所示:

代码语言:javascript
复制
df= self.spark.createDataFrame(rdd, schema = Schema)
df = df. \
             withColumn("op_type", lit(1)). \
             withColumn("op_time", current_timestamp())

因此,op_type设置为1,这意味着插入和op_time需要是current_timestamp()

如何通过数据流实现这一目标?这两列都是添加的列,所以“String To BigQuery Row”应该反映这一点吗?

谢谢

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-02-03 22:15:24

这起作用了

代码语言:javascript
复制
from datetime import datetime

    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        Timestamp = datetime.now()
        static_cols = {'op_type': 1, 'op_time': Timestamp}
        row.update(static_cols)
        return row

注意Python时间戳的定义->时间戳= datetime.now(),以正确映射到BigQuery中时间戳类型的列

票数 0
EN

Stack Overflow用户

发布于 2022-02-03 09:59:41

在返回之前更新该数据集。

代码语言:javascript
复制
def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        static_cols = {'op_time':'some_time','Op_type':'som_type'}
        row.update(static_cols)
        return row
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70968192

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档