首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在星火流应用程序中使用Kafka主题?

如何在星火流应用程序中使用Kafka主题?
EN

Stack Overflow用户
提问于 2019-12-08 14:18:28
回答 3查看 2K关注 0票数 1

当我从Kafka主题创建一个流并打印它的内容时

代码语言:javascript
复制
    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到了一个空的结果

代码语言:javascript
复制
    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

同时,它在控制台中工作:

代码语言:javascript
复制
    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

正确地给我提供了卡夫卡主题的所有文字行:

代码语言:javascript
复制
    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

从Kafka主题到Spark流应用程序的数据流的正确方式是什么?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2019-12-12 13:46:21

在流输出中看不到任何数据的原因是,默认情况下,火花流会从latest中开始读取数据。因此,如果您首先启动星火流应用程序,然后将数据写入Kafka,您将看到流作业中的输出。请参阅文档这里

默认情况下,它将开始使用每个Kafka分区的最新偏移量。

但是,您也可以从主题的任何特定偏移量读取数据。看看createDirectStream方法这里。它采用一个dict参数fromOffsets,您可以在字典中指定每个分区的偏移量。

我已经用kafka 2.2.0和spark 2.4.3和Python 3.7.3测试了下面的代码:

使用kafka依赖项启动pyspark shell:

代码语言:javascript
复制
pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

运行以下代码:

代码语言:javascript
复制
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

此外,如果您有kafka broker版本10或更高版本,您应该考虑使用结构化流而不是Spark流。参考结构化流文档这里和结构化流与卡夫卡集成这里

下面是在结构化流中运行的示例代码。请根据您的卡夫卡版本和火花版本使用jar版本。我将spark 2.4.3Scala 11kafka 0.10结合使用,所以使用jar spark-sql-kafka-0-10_2.11:2.4.3

启动pyspark外壳:

代码语言:javascript
复制
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
代码语言:javascript
复制
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()
票数 0
EN

Stack Overflow用户

发布于 2019-12-28 04:35:36

基于您的代码,我们不能直接打印流RDD,而应该是基于foreachRDD .DStream.foreachRDD的输出操作符在星火流中的打印。它允许您访问DStream的底层RDDs,以执行对数据进行实际操作的操作。

DStream.foreachRDD函数的含义是什么?

注::仍然可以通过结构化流来实现。参考文献:电火花结构化流处理

示例工作代码:这段代码试图读取卡夫卡主题的消息并打印出来。您可以根据您的需求更改此代码。

代码语言:javascript
复制
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

def handler(message):
    records = message.collect()
    for record in records:
        print(record[1])

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()
票数 1
EN

Stack Overflow用户

发布于 2019-12-10 14:28:10

我建议使用Spark结构化流。这是新一代的流媒体引擎,它发布了Spark2,您可以在这个链接中查看它。

对于Kafka集成,您可以查看这个链接上的文档。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59236297

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档