首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Spark dataframe访问Kafka源后失去流媒体能力

Spark dataframe访问Kafka源后失去流媒体能力
EN

Stack Overflow用户
提问于 2019-09-10 15:37:44
回答 1查看 187关注 0票数 0

我使用Spark 2.4.3和Kafka 2.3.0。我想用从Kafka到Spark的数据做Spark结构化流媒体。一般来说,它可以在测试模式下工作,但由于我必须对数据进行一些处理(并且不知道另一种方法),Spark数据帧不再具有流式传输功能。

代码语言:javascript
运行
复制
#!/usr/bin/env python3

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

# create schema for data
schema = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
dsraw = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test")
print("dsraw.isStreaming: ", dsraw.isStreaming)

# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")
print("ds.isStreaming: ", ds.isStreaming)

# Do query on the converted data
dsQuery = ds.writeStream.queryName("ds_query").format("memory").start()
df1 = spark.sql("select * from ds_query")
print("df1.isStreaming: ", df1.isStreaming)

# convert json into spark dataframe cols
df2 = df1.withColumn("value", from_json("value", schema))
print("df2.isStreaming: ", df2.isStreaming)

输出为:

代码语言:javascript
运行
复制
dsraw.isStreaming:  True
ds.isStreaming:  True
df1.isStreaming:  False
df2.isStreaming:  False

因此,当我创建第一个数据帧时,我失去了流功能。我怎么才能避免呢?如何从流中获取流Spark数据帧?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-13 07:58:39

不建议将内存接收器用于生产应用程序,因为所有数据都将存储在驱动程序中。

除了调试目的之外,也没有理由这样做,因为您可以像处理“普通”数据帧一样处理您的流式数据帧。例如:

代码语言:javascript
运行
复制
import pyspark.sql.functions as F

lines = spark.readStream.format("socket").option("host", "XXX.XXX.XXX.XXX").option("port", XXXXX).load()

words = lines.select(lines.value)

words = words.filter(words.value.startswith('h'))

wordCounts = words.groupBy("value").count()

wordCounts = wordCounts.withColumn('count', F.col('count') + 2)

query = wordCounts.writeStream.queryName("test").outputMode("complete").format("memory").start()

如果您仍然想继续使用您的方法:即使df.isStreaming告诉您它不是流数据帧(这是正确的),底层数据源是流,因此数据帧将随着每个处理的批处理而增长。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57866114

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档