这种开放性和灵活性的方法使数据存储和使用方式发生了转变。如今,客户可以选择在云对象存储(如 Amazon S3、Microsoft Azure Blob Storage或 Google Cloud Storage)中以开放表格式存储数据。数据由数据所有者全资拥有和管理,并保存在其安全的 Virtual Private Cloud (VPC) 帐户中。用户可以为其工作负载提供正确类型的查询引擎,而无需复制数据。这创建了一个面向未来的架构,可以在需要时将新工具添加到技术栈中。
尽管有这些优点,但仍存在一个障碍:需要选择单一表格格式,这带来了重大挑战,因为每种格式都具有独特的功能和集成优势。此外对于较新的工作负载,组织要求格式完全可互操作,因此数据是普遍可查询的。如果没有互操作性,组织就会被绑定到单一格式,迫使他们处理一次性迁移策略或制作完整的数据副本(通常经常)以使用其他格式。
Apache XTable(孵化)项目[1]是去年启动的一项开源计划,通过关注这些不同湖仓一体表格式之间的互操作性来应对这一挑战。XTable 充当轻量级转换层,允许在源表和目标表格式之间无缝转换元数据,而无需重写或复制实际数据文件。因此无论写入数据的初始表格式选择如何,都可以使用选择的首选格式和计算引擎来读取数据。
在这篇博客中,我们将介绍一个假设但实际的场景,该场景在当今组织内的分析工作负载中变得越来越频繁。
此方案从两个分析团队开始,该团队是组织中市场分析组的一部分。这些团队负责分析各种超市产品的市场趋势和消费者偏好。他们的大部分数据都位于 S3 数据湖中。对于这个特定的练习,我们使用了来自 Kaggle[2] 的公开数据。
团队 A 使用 Apache Hudi 来管理一些最关键的低延迟数据管道。Hudi 的优势在于它能够支持增量数据处理,在数据湖中提供更快的更新和删除。此外,Hudi 中强大的索引[3]和自动表管理功能[4]使团队 A 能够在其数据摄取过程中保持高水平的效率和性能,主要通过 Apache Spark 执行。此 Hudi 表包含特定时期内在“Tesco”中发生的销售数据。
另一方面,Team B 专注于临时分析、BI 和报告,利用 Dremio 强大的计算引擎和 Apache Iceberg 表的可靠性。Iceberg 的功能(如隐藏分区[5]和数据版本控制)与 Dremio 的分析工作负载查询加速功能无缝配对。这种组合使团队 B 能够执行复杂的分析,并轻松高效地生成 BI 报告。B组将超市“Aldi”的销售数据存储为Iceberg表。
为了对组织中的特殊营销活动进行详细的比较分析,B 团队希望了解“Tesco”和“Aldi”超市的品类产品销售情况。为此团队 B 希望使用团队 A 生成的数据集(存储为 Hudi 表)并将其与他们的数据集(Iceberg 表)相结合。鉴于他们使用 Dremio 作为分析和报告的计算引擎,这在传统上会构成重大障碍,因为 Dremio 本身不支持 Hudi 表。
在诸如此类的场景中,Apache XTable 提供了一个简单的解决方案,使团队 B 能够处理这个问题。使用 XTable,团队 B 将源 Hudi 表(“Tesco”数据)公开为 Iceberg 表。这是通过将元数据从 Hudi 转换为 Iceberg 来实现的,而无需重写或复制实际数据。此转换过程非常高效,并利用相同的 S3 存储桶来存储目标表的已翻译元数据。
一旦团队 A 数据 (Hudi) 呈现为 Iceberg 表,团队 B 就可以处理数据,就像它最初是以 Iceberg 格式编写的一样。他们可以利用 Dremio 计算的联接和联合等操作,使用来自两个团队的数据创建一个新数据集。通过 XTable,无需进行成本高昂的数据重写或繁琐的迁移工作,从而可以进行快速分析。借助 XTable,数据更加普遍可用,使组织能够无缝地使用多种表格格式。现在我们已经对 Apache XTable 提供的问题陈述和解决方案有了深入的了解,现在让我们深入了解实际方面,看看互操作性在上述场景中是如何工作的。
团队 A 使用 Apache Spark 将“Tesco”超市的销售数据摄取到存储在 S3 数据湖中的 Hudi 表中。让我们从创建 Hudi 表开始。以下是将 PySpark 与 Apache Hudi 一起使用所需的所有配置。
from typing import *
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder \
.appName("Hudi Table") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
print("Spark Running")
s3_path = "s3a://diplakehouse/hudi_tables/"
# Access SparkContext
sc = spark.sparkContext
spark.sql(
"""CREATE TABLE retail_data
(supermarket STRING, prices STRING, prices_unit STRING, unit STRING, names STRING, date STRING,
category STRING, own_brand STRING)
USING HUDI
LOCATION 's3a://diplakehouse/hudi_tables/'"""
);
现在我们将销售记录引入 Hudi 表。
spark.sql(
"""CREATE OR REPLACE TEMPORARY VIEW retail_temp USING csv
OPTIONS (path "Dataset/All_Data_Tesco.csv", header true)"""
)
spark.sql("INSERT INTO retail_data SELECT * FROM retail_temp")
让我们快速检查一下 S3 文件系统中的 Hudi 表文件。
下面是数据(使用 Spark SQL 查询)。
接下来,使用 Spark 执行“Aldi”超市的摄取,数据集作为 Iceberg 表 (retail_ice) 存储在 S3 数据湖中。此步骤模拟数据工程团队负责数据准备和引入的典型工作流。
如果要使用本地 Spark 和 Dremio 环境来试用此用例,请按照此存储库中的说明创建本地湖仓一体环境。
我们首先使用 PySpark 和 Hadoop 目录配置 Apache Iceberg,并创建 Iceberg 表。
import pyspark
from pyspark.sql import SparkSession
import os
conf = (
pyspark.SparkConf()
.setAppName('app_name')
.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
.set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://diplakehouse/iceberg_new/')
.set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
spark.sql(
"""CREATE TABLE IF NOT EXISTS hdfs_catalog.retail_ice
(supermarket STRING, prices STRING, prices_unit STRING, unit STRING, names STRING, date STRING, category STRING, own_brand STRING) USING iceberg"""
)
然后摄取“Aldi”的销售数据。
spark.sql(
"""CREATE OR REPLACE TEMPORARY VIEW salesview USING csv
OPTIONS (path "file:/home/docker/All_Data_Aldi.csv", header true)"""
)
spark.sql("INSERT INTO hdfs_catalog.retail_ice SELECT * FROM salesview")
在S3数据湖中将数据写入Iceberg表后,数据分析师可以使用Dremio的湖仓一体平台连接到湖并开始查询数据。
下面是一个简单的查询
因此,由于两个团队的数据都以两种不同的表格式存储,我们现在引入 Apache XTable 来解决互操作性挑战。
XTable 将用于将元数据从 Hudi 表(“Tesco”)转换为 Iceberg 格式,从而使数据能够使用 B 团队端的 Dremio 以 Iceberg 格式访问和查询。这不会修改或复制原始数据集的 Parquet 基础文件。
从 Apache XTable 开始,我们将首先将 GitHub[6] 存储库克隆到本地环境,并使用 Maven 编译必要的 jar。以下命令启动生成:
mvn clean package
有关安装的更多详细信息,请遵循官方文档[7]。
构建完成成功后,我们将使用utilities-0.1.0-SNAPSHOT-bundled.jar启动元数据转换过程。
下一步是在我们克隆的 XTable 目录中设置一个配置文件 my_config.yaml,以定义翻译详细信息。配置应如下所示:
sourceFormat: HUDI
targetFormats:
- ICEBERG
datasets:
- tableBasePath: s3://diplakehouse/hudi_tables/
tableName: retail_data
该配置概述了源格式 (Hudi)、目标格式 (Iceberg) 和表特定的详细信息:S3 中的基本路径和表名称。为了启动翻译过程,我们将执行以下命令。
java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
同步过程成功完成后,我们将看到输出,如下面的代码片段所示。
如果我们现在检查 S3 位置路径,我们将看到 Iceberg 元数据文件,其中包括架构定义、提交历史记录、分区信息和列统计信息等详细信息。这是 S3 中的元数据文件夹。正如我们所看到的,Iceberg 元数据是同一个 /hudi_tables 目录的一部分。
现在原始的 Hudi 表(“Tesco”数据集)已转换为 S3 数据湖中的 Iceberg 表,我们可以无缝地使用 Dremio 的计算引擎来查询数据并执行进一步的操作。
如果没有像 Apache XTable 这样的轻量级翻译层,从 Dremio 访问 Hudi 表将不简单。替代方案将涉及繁琐的迁移过程、昂贵的数据重写以及历史数据版本的潜在丢失。
让我们继续从 Dremio 查询这个新数据集。
现在在下一部分中,团队 B 希望将两个数据集(“Tesco”和“Aldi”)组合到一个视图中,并使用这些数据构建 BI 报告。我们将在这两个表上使用一个简单的 UNION,如下所示,以实现此目的。
Dremio 还允许将其保存为环境中特定空间(图层)中的视图,以便特定团队可以使用。我们会将合并后的数据集另存为 Universal_dataset_superstore。
因此这个组合数据集(Hudi翻译和Iceberg原生表)现在将由B团队用于对“Tesco”和“Aldi”超市进行类别产品销售分析。为此分析师可以使用 Dremio 中的“分析方式”按钮,使用这个新的组合数据集在 Tableau 中构建 BI 报表。
下面是 Tableau 中的最终报告,它集成了来自两种不同表格格式的数据集,以执行按类别的产品销售分析。“Tesco”数据(绿色)最初以 Hudi 格式存储,现在使用 XTable 转换为 Iceberg。“Aldi”数据(黄色)原生存储为 Iceberg 表。
这个用例强调了 XTable 的转换功能带来的好处。B 团队的分析师能够像处理Iceberg表一样处理 Tesco 的数据,而无需在分析过程中进行任何更改。XTable 提供的灵活性使 Dremio 能够读取和执行 Tesco 数据集的分析,而与原生 Iceberg 格式没有任何区别。开放格式之间的互操作能力可以节省资金,提高性能,简化分析工作流程,并确保数据普遍可访问。
引用链接
[1]
Apache XTable(孵化)项目: [https://github.com/apache/incubator-xtable](https://github.com/apache/incubator-xtable)
[2]
Kaggle: [https://www.kaggle.com/datasets/willianoliveiragibin/retail-analytics-trends/data](https://www.kaggle.com/datasets/willianoliveiragibin/retail-analytics-trends/data)
[3]
索引: [https://hudi.apache.org/docs/next/indexing](https://hudi.apache.org/docs/next/indexing)
[4]
自动表管理功能: [https://hudi.apache.org/docs/next/clustering](https://hudi.apache.org/docs/next/clustering)
[5]
隐藏分区: [https://www.dremio.com/blog/fewer-accidental-full-table-scans-brought-to-you-by-apache-icebergs-hidden-partitioning/](https://www.dremio.com/blog/fewer-accidental-full-table-scans-brought-to-you-by-apache-icebergs-hidden-partitioning/)
[6]
GitHub: [https://github.com/apache/incubator-xtable](https://github.com/apache/incubator-xtable)
[7]
官方文档: [https://xtable.apache.org/docs/setup/](https://xtable.apache.org/docs/setup/)