在大数据时代中我们迫切需要实时应用解决源源不断涌入的数据,然而建立这么一个应用需要解决多个问题:
Structured Streaming以Spark SQL 为基础, 建立在上述基础之上,借用其强力API提供无缝的查询接口,同时最优化的执行低延迟持续的更新结果。
ETL: Extract, Transform, and Load
ETL操作可将非结构化数据转化为可以高效查询的Table。具体而言需要可以执行以下操作:
传统上,ETL定期执行批处理任务。例如实时转储原始数据,然后每隔几小时将其转换为结构化表格,以实现高效查询,但高延迟非常高。在许多情况下这种延迟是不可接受的。
幸运的是,Structured Streaming 可轻松将这些定期批处理任务转换为实时数据。此外,该引擎提供保证与定期批处理作业相同的容错和数据一致性,同时提供更低的端到端延迟。
val cloudTrailSchema = new StructType()
.add("Records", ArrayType(new StructType()
.add("additionalEventData", StringType)
.add("apiVersion", StringType)
.add("awsRegion", StringType)
// ...
val rawRecords = spark.readStream
.schema(cloudTrailSchema)
.json("s3n://mybucket/AWSLogs/*/CloudTrail/*/2017/*/*")
这里的rawRecords
为Dataframe,可理解为无限表格
转化为Dataframe我们可以很方便地使用Spark SQL查询一些复杂的结构
val cloudtrailEvents = rawRecords
.select(explode($"records") as 'record)
.select(
unix_timestamp(
$"record.eventTime",
"yyyy-MM-dd'T'hh:mm:ss").cast("timestamp") as 'timestamp, $"record.*")
val streamingETLQuery = cloudtrailEvents
.withColumn("date", $"timestamp".cast("date") // derive the date
.writeStream
.trigger(ProcessingTime("10 seconds")) // check for files every 10s
.format("parquet") // write as Parquet partitioned by date
.partitionBy("date")
.option("path", "/cloudtrail")
.option("checkpointLocation", "/cloudtrail.checkpoint/")
.start()
StreamingQuery将会连续运行,当新数据到达时并会对其进行转换
这里我们为StreamingQuery指定以下配置:
option(“checkpointLocation”,“/ cloudtrail.checkpoint /”)
当查询处于活动状态时,Spark会不断将已处理数据的元数据写入检查点目录。即使整个群集出现故障,也可以使用相同的检查点目录在新群集上重新启动查询,并进行恢复。更具体地说,在新集群上,Spark使用元数据来启动新查询,从而确保端到端一次性和数据一致性。
此部分具体将讨论以下内容:
结构化数据
结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。
非结构化数据
相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常要求数据周围的上下文是可解析的。
半结构化数据
半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。 半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。
Spark SQL支持以Parquet,ORC,JSON,CSV和文本格式读取和写入数据,并且Spark包中还存在大量其他连接器,还可以使用JDBC DataSource连接到SQL数据库。
转数据格式如下所示:
events = spark.readStream \
.format("json") \ # or parquet, kafka, orc...
.option() \ # format specific options
.schema(my_schema) \ # required
.load("path/to/data")
output = … # perform your transformations
output.writeStream \ # write out your data
.format("parquet") \
.start("path/to/write")
例如:
嵌套所有列: 星号(*)可用于包含嵌套结构中的所有列。
// input
{
"a": 1,
"b": 2
}
Python: events.select(struct("*").alias("x"))
Scala: events.select(struct("*") as 'x)
SQL: select struct(*) as x from events
// output
{
"x": {
"a": 1,
"b": 2
}
}
Spark SQL提供from_json()
及to_json()
函数
// input
{
"a": "{\"b\":1}"
}
Python:
schema = StructType().add("b", IntegerType())
events.select(from_json("a", schema).alias("c"))
Scala:
val schema = new StructType().add("b", IntegerType)
events.select(from_json('a, schema) as 'c)
// output
{
"c": {
"b": 1
}
}
regexp_extract()
解析正则表达式
// input
[{ "a": "x: 1" }, { "a": "y: 2" }]
Python: events.select(regexp_extract("a", "([a-z]):", 1).alias("c"))
Scala: events.select(regexp_extract('a, "([a-z]):", 1) as 'c)
SQL: select regexp_extract(a, "([a-z]):", 1) as c from events
// output
[{ "c": "x" }, { "c": "y" }]
此部分将讨论使用Spark SQL API处理转换来自Kafka的复杂数据流,并存储到HDFS MySQL等系统中。
Kafka是一种分布式pub-sub消息传递系统,广泛用于摄取实时数据流,并以并行和容错的方式向下游消费者提供。这使得Kafka适合构建可在异构处理系统之间可靠地移动数据的实时流数据流水线。
Kafka中的数据被分为并行分区的主题。每个分区都是有序且不可变的记录序列。Producer将记录附加到这些序列的尾部,Consumer按照自己需要阅读序列。多个消费者可以订阅主题并在数据到达时接收数据。当新数据到达Kafka主题中的分区时,会为它们分配一个称为偏移的顺序ID号。 Kafka群集保留所有已发布的数据无论它们是否已被消耗。在可配置的保留期内,之后它们被标记为删除。
我们有三种不同startingOffsets选项读取数据:
从Kafka中读取数据,并将二进制流数据转为字符串:
# Construct a streaming DataFrame that reads from topic1
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
使用Spark作为Producer发送Kafka数据:
# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
此例子使用一个Nest摄像头,收集的数据通过Kafka发送至Spark做相应计算,下面是Nest发送的JSON数据格式:
"devices": {
"cameras": {
"device_id": "awJo6rH...",
"last_event": {
"has_sound": true,
"has_motion": true,
"has_person": true,
"start_time": "2016-12-29T00:00:00.000Z",
"end_time": "2016-12-29T18:42:00.000Z"
}
}
}
我们的目标:
我们使用from_json
函数读取并解析从Nest摄像头发来的数据
schema = StructType() \
.add("metadata", StructType() \
.add("access_token", StringType()) \
.add("client_version", IntegerType())) \
.add("devices", StructType() \
.add("thermostats", MapType(StringType(), StructType().add(...))) \
.add("smoke_co_alarms", MapType(StringType(), StructType().add(...))) \
.add("cameras", MapType(StringType(), StructType().add(...))) \
.add("companyName", StructType().add(...))) \
.add("structures", MapType(StringType(), StructType().add(...)))
nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "nest-logs") \
.load() \
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
我们使用explode()
函数为每个键值对创建一个新行,展平数据
camera = parsed \
.select(explode("parsed_value.devices.cameras")) \
.select("value.*")
sightings = camera \
.select("device_id", "last_event.has_person", "last_event.start_time") \
.where(col("has_person") == True)
可以使用printSchema()
方法查看camera DataSet的结构:
camera.printSchema()
root
|-- device_id: string (nullable = true)
|-- software_version: string (nullable = true)
|-- structure_id: string (nullable = true)
|-- where_id: string (nullable = true)
|-- where_name: string (nullable = true)
|-- name: string (nullable = true)
|-- name_long: string (nullable = true)
|-- is_online: boolean (nullable = true)
|-- is_streaming: boolean (nullable = true)
|-- is_audio_input_enable: boolean (nullable = true)
|-- last_is_online_change: timestamp (nullable = true)
|-- is_video_history_enabled: boolean (nullable = true)
|-- web_url: string (nullable = true)
|-- app_url: string (nullable = true)
|-- is_public_share_enabled: boolean (nullable = true)
|-- activity_zones: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- id: string (nullable = true)
|-- public_share_url: string (nullable = true)
|-- snapshot_url: string (nullable = true)
|-- last_event: struct (nullable = true)
| |-- has_sound: boolean (nullable = true)
| |-- has_motion: boolean (nullable = true)
| |-- has_person: boolean (nullable = true)
| |-- start_time: timestamp (nullable = true)
| |-- end_time: timestamp (nullable = true)
| |-- urls_expire_time: timestamp (nullable = true)
| |-- web_url: string (nullable = true)
| |-- app_url: string (nullable = true)
| |-- image_url: string (nullable = true)
| |-- animated_image_url: string (nullable = true)
| |-- activity_zone_ids: array (nullable = true)
| | |-- element: string (containsNull = true)
我们首先创建一个表示此位置数据的DataFrame,然后将其与目标DataFrame连接,并在设备ID上进行匹配。我们在这里做的是将流式DataFrame目标加入静态DataFrame位置:
locationDF = spark.table("device_locations").select("device_id", "zip_code")
sightingLoc = sightings.join(locationDF, "device_id")
生成一个流式聚合,计算每小时每个邮政编码中的摄像头人数,然后将其写入Kafka topic1,称为“nest-camera-stats”
sightingLoc \
.groupBy("zip_code", window("start_time", "1 hour")) \
.count() \
.select( \
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "nest-camera-stats") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.outputMode("complete") \
.start()
聚合统计数据并写入Kafka:
sightingLoc \
.groupBy("zip_code", window("start_time", "1 hour")) \
.count() \
.select( \
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "nest-camera-stats") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.outputMode("complete") \
.start()
camera.writeStream \
.format("parquet") \
.option("startingOffsets", "earliest") \
.option("path", "s3://nest-logs") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
PS:我们可以使用相同的Dataframe做多个流查询(streaming queries)
这里直接使用read
方法去做批量查询,用法与readStream
类似
report = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "nest-camera-stats") \
.load() \
.select( \
json_tuple(col("key").cast("string"), "zip_code", "window").alias("zip_code", "window"),
col("value").cast("string").cast("integer").alias("count")) \
.where("count > 1000") \
.select("zip_code", "window") \
.distinct()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。