因为我发现Azure Service Bus没有spark结构化流源,所以在这种情况下,我可以使用提供的Python客户端读取Azure Service Bus消息,然后从Python客户端读取每个消息并将其写入Kafka主题,在这个Kafka主题上,我将应用spark结构化流编程。我的用例是使用Azure Service Bus流消息,并通过将其转换为时间流数据库InfluxDb或Pramethoues来编写每条消
我正在接收Kafka消息上的文件路径。我需要将这个文件加载到spark RDD中,对其进行处理,然后将其转储到HDFS。我不能在Kafka消息数据集上运行map函数。由于sparkContext在worker上不可用,因此出现NPE错误。它会出错,并显示以下消息:
Queries with streami
下面是试图读取流时所收到的错误。读取流时,Databricks无法找到keystore文件。df = spark.readStream \ .option("kafka.bootstrap.servers","kafka server with portdbfs中,并且还能够读取该文件。我们还在数据库
在spark批处理作业中,我通常会将JSON数据源写入到一个文件中,并且可以使用DataFrame阅读器的损坏列功能将损坏的数据写出到不同的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写为拼接) 但在Spark Structred Streaming中,我首先通过kafka将流作为字符串读取,然后使用from_json获取我的DataFrame。/spark-sql-Expression-JsonToStructs.html va