当我从Kafka主题创建一个流并打印它的内容时
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonStreamingKafkaWords")
ssc = StreamingContext(sc, 10)
lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})
lines.pprint()
ssc.start()
ssc.awaitTermination()我得到了一个空的结果
-------------------------------------------
Time: 2019-12-07 13:11:50
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:00
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:10
-------------------------------------------同时,它在控制台中工作:
kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092正确地给我提供了卡夫卡主题的所有文字行:
ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
ham Ard 6 like dat lor.
ham Why don't you wait 'til at least wednesday to see if you get your .
ham Huh y lei...
spam REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
spam This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
ham Will ü b going to esplanade fr home?
. . . 从Kafka主题到Spark流应用程序的数据流的正确方式是什么?
发布于 2019-12-12 13:46:21
在流输出中看不到任何数据的原因是,默认情况下,火花流会从latest中开始读取数据。因此,如果您首先启动星火流应用程序,然后将数据写入Kafka,您将看到流作业中的输出。请参阅文档这里:
默认情况下,它将开始使用每个Kafka分区的最新偏移量。
但是,您也可以从主题的任何特定偏移量读取数据。看看createDirectStream方法这里。它采用一个dict参数fromOffsets,您可以在字典中指定每个分区的偏移量。
我已经用kafka 2.2.0和spark 2.4.3和Python 3.7.3测试了下面的代码:
使用kafka依赖项启动pyspark shell:
pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0运行以下代码:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}
lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)
lines.pprint()
ssc.start()
ssc.awaitTermination()此外,如果您有kafka broker版本10或更高版本,您应该考虑使用结构化流而不是Spark流。参考结构化流文档这里和结构化流与卡夫卡集成这里。
下面是在结构化流中运行的示例代码。请根据您的卡夫卡版本和火花版本使用jar版本。我将spark 2.4.3与Scala 11和kafka 0.10结合使用,所以使用jar spark-sql-kafka-0-10_2.11:2.4.3。
启动pyspark外壳:
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.start()发布于 2019-12-28 04:35:36
基于您的代码,我们不能直接打印流RDD,而应该是基于foreachRDD .DStream.foreachRDD的输出操作符在星火流中的打印。它允许您访问DStream的底层RDDs,以执行对数据进行实际操作的操作。
注::仍然可以通过结构化流来实现。参考文献:电火花结构化流处理
示例工作代码:这段代码试图读取卡夫卡主题的消息并打印出来。您可以根据您的需求更改此代码。
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
def handler(message):
records = message.collect()
for record in records:
print(record[1])
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()https://stackoverflow.com/questions/59236297
复制相似问题