首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >为 Delta 新增 Upsert(Merge)功能

为 Delta 新增 Upsert(Merge)功能

作者头像
用户2936994
发布于 2019-06-11 12:13:05
发布于 2019-06-11 12:13:05
1K00
代码可运行
举报
文章被收录于专栏:祝威廉祝威廉
运行总次数:0
代码可运行

前言

今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。

Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。

代码使用方式

Upsert支持流式和批的方式进行更新。因为受限于Spark的SQL解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。

批使用方式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
val upsertTableInDelta = UpsertTableInDelta(data, Option(SaveMode.Append), None, log,
            new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf),
            Seq(),
            Map("idCols" -> "key,value"))
val items = upsertTableInDelta.run(df.sparkSession)

唯一需要大家指定的就是 idCols, 也就是你的表的唯一主键组合是啥。比如我这里是key,value两个字段组成唯一主键。

流使用技巧是一模一样的,只需要做一点点修改:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 UpsertTableInDelta(data, None, Option(OutputMode.Append())

UpsertTableInDelta 根据你设置的是SaveMode还是OutputMode来看是不是流写入。

MLSQL 使用方式

写入数据到Kafka:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
set abc='''
{ "x": 100, "y": 201, "z": 204 ,"dataType":"A group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

使用流程序消费Kafka:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- the stream name, should be uniq.
set streamName="kafkaStreamExample";

!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;

-- convert table as stream source
load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
and failOnDataLoss="false"
as newkafkatable1;

-- aggregation 
select *  from newkafkatable1
as table21;

-- output the the result to console.
save append table21  
as rate.`/tmp/delta/wow-0` 
options mode="Append"
and idCols="x,y"
and duration="5"
and checkpointLocation="/tmp/s-cpl6";

同样的,我们设置了idCols,指定x,y为唯一主键。

然后查看对应的记录变化:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
load delta.`/tmp/delta/wow-0` as show_table1;
select * from show_table1 where x=100 and z=204 as output;

你会惊喜的发现数据可以更新了。

实现剖析

一共涉及到三个新文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
org.apache.spark.sql.delta.commands.UpsertTableInDelta
org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource
org.apache.spark.sql.delta.sources.MLSQLDeltaSink

对应源码参看我fork的delta项目: mlsql-delta

第一个文件是实现核心的更新逻辑。第二个第三个支持Spark的datasource API来进行批和流的写入。

这篇文章我们主要介绍UpsertTableInDelta。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case class UpsertTableInDelta(_data: Dataset[_],
                              saveMode: Option[SaveMode],
                              outputMode: Option[OutputMode],
                              deltaLog: DeltaLog,
                              options: DeltaOptions,
                              partitionColumns: Seq[String],
                              configuration: Map[String, String]
                             ) extends RunnableCommand
  with ImplicitMetadataOperation
  with DeltaCommand with DeltaCommandsFun {

UpsertTableInDelta 集成了delta一些必要的基础类,ImplicitMetadataOperation,DeltaCommand,主要是为了方便得到一些操作日志写入的方法。

saveMode 和 outputMode 主要是为了方便区分现在是流在写,还是批在写,以及写的模式是什么。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
assert(configuration.contains(UpsertTableInDelta.ID_COLS), "idCols is required ")

    if (outputMode.isDefined) {
      assert(outputMode.get == OutputMode.Append(), "append is required ")
    }

    if (saveMode.isDefined) {
      assert(saveMode.get == SaveMode.Append, "append is required ")
    }

限制条件是必须都是用Append模式,并且idCols是必须存在的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
saveMode match {
      case Some(mode) =>
        deltaLog.withNewTransaction { txn =>
          actions = upsert(txn, sparkSession)
          val operation = DeltaOperations.Write(SaveMode.Overwrite,
            Option(partitionColumns),
            options.replaceWhere)
          txn.commit(actions, operation)
        }
      case None => outputMode match {

如果是批写入,那么直接调用deltaLog开启一个新的事物,然后进行upsert操作。同时进行commit,然后就搞定了。

如果是流写入则麻烦一点,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
case None => outputMode match {
        case Some(mode) =>
          val queryId = sparkSession.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
          assert(queryId != null)

          if (SchemaUtils.typeExistsRecursively(_data.schema)(_.isInstanceOf[NullType])) {
            throw DeltaErrors.streamWriteNullTypeException
          }

          val txn = deltaLog.startTransaction()
          // Streaming sinks can't blindly overwrite schema.
          // See Schema Management design doc for details
          updateMetadata(
            txn,
            _data,
            partitionColumns,
            configuration = Map.empty,
            false)

          val currentVersion = txn.txnVersion(queryId)
          val batchId = configuration(UpsertTableInDelta.BATCH_ID).toLong
          if (currentVersion >= batchId) {
            logInfo(s"Skipping already complete epoch $batchId, in query $queryId")
          } else {
            actions = upsert(txn, sparkSession)
            val setTxn = SetTransaction(queryId,
              batchId, Some(deltaLog.clock.getTimeMillis())) :: Nil
            val info = DeltaOperations.StreamingUpdate(outputMode.get, queryId, batchId)
            txn.commit(setTxn ++ actions, info)
          }
      }
    }

首选我们获取queryId,因为在delta里需要使用queryId获取事务ID(batchId),并且最后写完成之后的会额外写入一些数据到元数据里,也需要queryId。

updateMetadata 主要是为了检测schema信息,譬如如果stream 是complte模式,那么是直接覆盖的,而如果是其他模式,则需要做schema合并。

如果我们发现当前事务ID>batchId,说明这个已经运行过了,跳过。如果没有,则使用upsert进行实际的操作。最后设置一些额外的信息提交。

upsert 方法

upsert的基本逻辑是:

  1. 获取idCols是不是有分区字段,如果有,先根据分区字段过滤出所有的文件。
  2. 如果没有分区字段,则得到所有的文件
  3. 将这些文件转化为dataframe
  4. 和新写入的dataframe进行join操作,得到受影响的行(需要更新的行),然后得到这些行所在的文件。
  5. 获取这些文件里没有无需变更的记录,写成新文件。
  6. 删除这些文件
  7. 将新数据写成新文件

4,5两个步骤需要对数据进行join,但是在Spark里静态表并不能直接join流表,所以我们需要将流表转化为静态表。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def upsert(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {

    // if _data is stream dataframe, we should convert it to normal
    // dataframe and so we can join it later
    val data = if (_data.isStreaming) {
      class ConvertStreamDataFrame[T](encoder: ExpressionEncoder[T]) {

        def toBatch(data: Dataset[_]): Dataset[_] = {
          val resolvedEncoder = encoder.resolveAndBind(
            data.logicalPlan.output,
            data.sparkSession.sessionState.analyzer)
          val rdd = data.queryExecution.toRdd.map(resolvedEncoder.fromRow)(encoder.clsTag)
          val ds = data.sparkSession.createDataset(rdd)(encoder)
          ds
        }
      }
      new ConvertStreamDataFrame[Row](_data.asInstanceOf[Dataset[Row]].exprEnc).toBatch(_data)
    } else _data

上述代码就是将流表转化为普通静态表。接着我们需要拿到主键字段里满足分区字段的字段,然后获取他们的min/max值

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val minMaxColumns = partitionColumnsInIdCols.flatMap { column =>
        Seq(F.lit(column), F.min(column).as(s"${column}_min"), F.max(F.max(s"${column}_max")))
      }.toArray
      val minxMaxKeyValues = data.select(minMaxColumns: _*).collect()

最后得到过滤条件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// build our where statement
      val whereStatement = minxMaxKeyValues.map { row =>
        val column = row.getString(0)
        val minValue = row.get(1).toString
        val maxValue = row.get(2).toString

        if (isNumber(column)) {
          s"${column} >= ${minValue} and   ${maxValue} >= ${column}"
        } else {
          s"""${column} >= "${minValue}" and   "${maxValue}" >= ${column}"""
        }
      }
      logInfo(s"whereStatement: ${whereStatement.mkString(" and ")}")
      val predicates = parsePartitionPredicates(sparkSession, whereStatement.mkString(" and "))
      Some(predicates)

现在可以得到所有相关的文件了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val filterFilesDataSet = partitionFilters match {
      case None =>
        snapshot.allFiles
      case Some(predicates) =>
        DeltaLog.filterFileList(
          metadata.partitionColumns, snapshot.allFiles.toDF(), predicates).as[AddFile]
    }

将这些文件转化为dataframe,并且将里面的每条记录都带上所属文件的路径:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Again, we collect all files to driver,
    // this may impact performance and even make the driver OOM when
    // the number of files are very huge.
    // So please make sure you have configured the partition columns or make compaction frequently

    val filterFiles = filterFilesDataSet.collect
    val dataInTableWeShouldProcess = deltaLog.createDataFrame(snapshot, filterFiles, false)

    val dataInTableWeShouldProcessWithFileName = dataInTableWeShouldProcess.
      withColumn(UpsertTableInDelta.FILE_NAME, F.input_file_name())

通过Join获取哪些文件里面的记录需要被更新:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// get all files that are affected by the new data(update)
    val filesAreAffected = dataInTableWeShouldProcessWithFileName.join(data,
      usingColumns = idColsList,
      joinType = "inner").select(UpsertTableInDelta.FILE_NAME).
      distinct().collect().map(f => f.getString(0))
val tmpFilePathSet = filesAreAffected.map(f => f.split("/").last).toSet

    val filesAreAffectedWithDeltaFormat = filterFiles.filter { file =>
      tmpFilePathSet.contains(file.path.split("/").last)
    }

    val deletedFiles = filesAreAffectedWithDeltaFormat.map(_.remove)

将需要删除的文件里没有改变的记录单独拿出来写成新文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// we should get  not changed records in affected files and write them back again
    val affectedRecords = deltaLog.createDataFrame(snapshot, filesAreAffectedWithDeltaFormat, false)

    val notChangedRecords = affectedRecords.join(data,
      usingColumns = idColsList, joinType = "leftanti").
      drop(F.col(UpsertTableInDelta.FILE_NAME))
val notChangedRecordsNewFiles = txn.writeFiles(notChangedRecords, Some(options))

最后将我们新增数据写入:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val newFiles = txn.writeFiles(data, Some(options))

因为第一次写入的时候,schema还没有形成,所以不能走upsert逻辑,而是需要直接写入,这里我偷懒,没有把逻辑写在UpsertTableInDelta里,而是写在了MLSQLDeltaSink里:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val metadata = deltaLog.snapshot.metadata
    val readVersion = deltaLog.snapshot.version
    val isInitial = readVersion < 0
    if (!isInitial && parameters.contains(UpsertTableInDelta.ID_COLS)) {
      UpsertTableInDelta(data, None, Option(outputMode), deltaLog,
        new DeltaOptions(Map[String, String](), sqlContext.sparkSession.sessionState.conf),
        Seq(),
        Map(UpsertTableInDelta.ID_COLS -> parameters(UpsertTableInDelta.ID_COLS),
          UpsertTableInDelta.BATCH_ID -> batchId.toString
        )).run(sqlContext.sparkSession)

    } else {
      super.addBatch(batchId, data)
    }
  }

总结

Delta 具备了数据的增删改查能力,同时流批共享,并发修改控制,加上小文件compaction功能,基本解决了我们之前在使用流计算遇到的大部分问题。后续持续优化delta的查询功能,相信前景无限。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.06.10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
用 Spark 优化亿级用户画像计算:Delta Lake 增量更新策略详解
在亿级用户规模的系统中,用户画像计算面临三大核心挑战:数据体量巨大(PB级)、更新频率高(每日千万级更新)、查询延迟敏感(亚秒级响应)。传统全量计算模式在每日ETL中消耗数小时集群资源,无法满足实时业务需求。
大熊计算机
2025/07/15
1100
用 Spark 优化亿级用户画像计算:Delta Lake 增量更新策略详解
Structured Streaming快速入门详解(8)
接着上一篇《Spark Streaming快速入门系列(7)》,这算是Spark的终结篇了,从Spark的入门到现在的Structured Streaming,相信很多人学完之后,应该对Spark摸索的差不多了,Spark是一个很重要的技术点,希望我的文章能给大家带来帮助。
刘浩的BigDataPath
2021/04/13
1.7K0
Structured Streaming快速入门详解(8)
数据湖(四):Hudi与Spark整合
默认Spark操作Hudi使用表类型为Copy On Write模式。Hudi与Spark整合时有很多参数配置,可以参照https://hudi.apache.org/docs/configurations.html配置项来查询,此外,整合时有几个需要注意的点,如下:
Lansonli
2022/05/30
3.5K2
数据湖(四):Hudi与Spark整合
Structured Streaming如何实现Parquet存储目录按时间分区
StreamingPro现在支持以SQL脚本的形式写Structured Streaming流式程序了: mlsql-stream。不过期间遇到个问题,我希望按天进行分区,但是这个分区比较特殊,就是是按接收时间来落地进行分区,而不是记录产生的时间。
用户2936994
2018/08/27
1.1K0
[Delta][SQL] Delta开源付费功能,最全分析ZOrder的源码实现流程
通常为提高数据处理的效率,计算引擎要实现谓词的下推,而存储引擎可以根据下推的过滤条件尽可能的跳过无关数据或文件。不管是Hudi、Iceberg还是Delta都实现了基于min-max索引的Data-skiping技术。它指的是在元数据中都记录这数据文件中的每一列的最小值和最大值,通过查询中列上的谓词来决定当前的数据文件是否可能包含满足谓词的任何records,是否可以跳过读取当前数据文件。
Tim在路上
2022/09/07
1.4K0
[Delta][SQL] Delta开源付费功能,最全分析ZOrder的源码实现流程
数据湖(五):Hudi与Hive集成
Hudi与Hive集成原理是通过代码方式将数据写入到HDFS目录中,那么同时映射Hive表,让Hive表映射的数据对应到此路径上,这时Hudi需要通过JDBC方式连接Hive进行元数据操作,这时需要配置HiveServer2。
Lansonli
2022/05/31
3.1K0
数据湖(五):Hudi与Hive集成
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的3个组件,并且在每个组件显式地做到fault-tolerant(容错),由此得到整个streaming程序的 end-to-end exactly-once guarantees。
Lansonli
2021/10/11
1.5K0
客快物流大数据项目(一百零一):实时OLAP开发
使用ClickHouse分析物流指标数据,必须将数据存储到ClickHouse中。
Lansonli
2022/12/29
1.4K0
客快物流大数据项目(一百零一):实时OLAP开发
Apache Hudi入门指南(含代码示例)
hudi详细介绍见hudi官网 http://hudi.apache.org/cn/docs/0.5.0-quick-start-guide.html
ApacheHudi
2021/04/13
3.6K0
2021年大数据Spark(四十七):Structured Streaming Sink 输出
在StructuredStreaming中定义好Result DataFrame/Dataset后,调用writeStream()返回DataStreamWriter对象,设置查询Query输出相关属性,启动流式应用运行,相关属性如下:
Lansonli
2021/10/11
1.2K0
8.deltalake的merge四个案例场景
实际上,线上业务很多时候数据源在上报数据的时候,由于各种原因可能会重复上报数据,这就会导致数据重复,使用merge函数可以避免插入重复的数据。具体操作方法如下:
Spark学习技巧
2021/03/05
9940
Structured Streaming教程(2) —— 常用输入与输出
Structured Streaming 提供了几种数据源的类型,可以方便的构造Steaming的DataFrame。默认提供下面几种类型:
用户1154259
2018/07/31
1.4K0
「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。
王知无-import_bigdata
2022/03/11
2.8K0
「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
Spark Structured Streaming + Kafka使用笔记
这篇博客将会记录Structured Streaming + Kafka的一些基本使用(Java 版)
大鹅
2021/06/16
2K0
Spark报错记录:Overloaded method foreachBatch with alternatives
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives0. 写在前面1. 报错2. 代码及报错信息3. 原因及纠错4. 参考链接
WHYBIGDATA
2023/01/31
6920
Spark报错记录:Overloaded method foreachBatch with alternatives
看了这篇博客,你还敢说不会Structured Streaming?
本篇博客,博主为大家带来的是关于Structured Streaming从入门到实战的一个攻略,希望感兴趣的朋友多多点赞支持!!
大数据梦想家
2021/01/27
1.8K0
看了这篇博客,你还敢说不会Structured Streaming?
Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍
一,概述 Structured Streaming是一个可扩展和容错的流处理引擎,并且是构建于sparksql引擎之上。你可以用处理静态数据的方式去处理你的流计算。随着流数据的不断流入,Sparksql引擎会增量的连续不断的处理并且更新结果。可以使用DataSet/DataFrame的API进行 streaming aggregations, event-time windows, stream-to-batch joins等等。计算的执行也是基于优化后的sparksql引擎。通过checkpointing
Spark学习技巧
2018/01/30
2.5K0
Spark源码系列之spark2.2的StructuredStreaming使用及源码介绍
Spark实时(六):Output Sinks案例演示
当我们对流式数据处理完成之后,可以将数据写出到Flie、Kafka、console控制台、memory内存,或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。
Lansonli
2025/05/24
1380
Spark实时(六):Output Sinks案例演示
[LakeHouse] Delta Lake全部开源,聊聊Delta的实现架构
刚刚结束的Data + AI summit上,Databricks宣布将Delta Lake全部开源。
Tim在路上
2022/09/01
1.4K0
[LakeHouse] Delta Lake全部开源,聊聊Delta的实现架构
实战|使用Spark Streaming写入Hudi
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
ApacheHudi
2021/04/13
2.4K0
推荐阅读
相关推荐
用 Spark 优化亿级用户画像计算:Delta Lake 增量更新策略详解
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档