首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >MLSQL Stack 让流调试更加简单

MLSQL Stack 让流调试更加简单

作者头像
用户2936994
发布于 2019-06-05 08:50:17
发布于 2019-06-05 08:50:17
43600
代码可运行
举报
文章被收录于专栏:祝威廉祝威廉
运行总次数:0
代码可运行

前言

有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:

  • 能随时查看最新固定条数的Kafka数据
  • 调试结果(sink)能打印在web控制台
  • 流程序能自动推测json schema(现在spark是不行的)

实现这三个点之后,我发现调试确实就变得简单很多了。

流程

首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

这样我每次运行,数据就能写入到Kafka.

接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:

image.png

没有什么问题。接着我写一个非常简单的流式程序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select *  from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21  
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

运行结果如下:

image.png

在终端我们也可以看到实时效果了。

补充

当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
-- the stream name, should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k  from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21  
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.06.03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
如何用MLSQL快速调试Structured Streaming程序
早上对Structured Streaming 的window函数, Output Mode 以及Watermark有些疑惑的地方。Structured Streaming 的文档偏少,而且网上的文章同质化太严重,基础的不能再基础了,但是我也不想再开个测试的工程项目,所以直接就给予MLSQL来调试。
用户2936994
2018/09/29
4440
如何用MLSQL快速调试Structured Streaming程序
MLSQL 内置Delta数据湖以及Compaction功能介绍
之前写过一篇文章 Delta的真正用处和价值,你可知道,该项目开源的那天我就集到MLSQL了。不过当时只是尝鲜性质,主要原因是因为我一直觉得delta缺了Compaction功能。很多公司其实都有小文件的困扰,而Delta这个问题会更严重。不过近期Delta团队应该就会发布新版本了,届时有可能相关的功能都会补上。不过MLSQL现在也自己实现了一个Compaction的功能,并且对delta做了一定的集成和增强。
用户2936994
2019/06/13
7670
为 Delta 新增 Upsert(Merge)功能
今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。
用户2936994
2019/06/11
1K0
MLSQL数据源开发指南
那么我们如何实现自己的数据源呢?下面我们会分两部分,第一部分是已经有第三方实现了的标准Spark数据源的集成,第二个是你自己创造的新的数据源。
用户2936994
2019/03/22
4970
Kafka集群安装
①.kafka需要依赖zk管理,在搭建kafka集群之前需要先搭建zk集群: https://my.oschina.net/u/2486137/blog/1537389 ②.从apache kafka官网下载kafka( 二进制版本)        注意下载的版本否则会在启动时报错:找不到主类Kafka.kafka. 我这里使用的是2.10版本. ③.配置config/server.properties文件: # Licensed to the Apache Software Foundation (ASF
用户1215919
2018/02/27
1.2K0
Kafka学习笔记之Kakfa异常分析-Magic v0 does not support record headers
最近测试跟我说,某个应用消费不到交易的消息。登录到Kafka Broker看下了下日志,发现一直在报错:
Jetpropelledsnake21
2021/01/14
1.5K0
kafka中partition和消费者对应关系
1个partition只能被同组的一个consumer消费,同组的consumer则起到均衡效果
小勇DW3
2019/09/02
2.9K0
kafka中partition和消费者对应关系
基于SparkSQL实现的一套即席查询服务
支持的数据源:hdfs、hive、hbase、kafka、mysql、es、mongo
UFO
2018/12/28
2.1K0
Your Guide to Python with MLSQL Stack (二)
In the previous post Your Guide to NLP with MLSQL Stack (一), we already have known how to build a RandomForest model to classify text content. The TF/IDF, RandomForest are all built-in algorithms and implemented by Java. In this post, we will show you how to use Python to do the same job.
用户2936994
2019/05/14
5840
Your Guide to Python with MLSQL Stack (二)
消息队列中间件(三)Kafka 入门指南
Kafka的前身是由LinkedIn开源的一款产品,2011年初开始开源,加入了 Apache 基金会,2012年从 Apache Incubator 毕业变成了 Apache 顶级开源项目。同时LinkedIn还有许多著名的开源产品。如:
未读代码
2019/11/04
6440
消息队列中间件(三)Kafka 入门指南
06 . ELK Stack + kafka集群
https://www.cnblogs.com/you-men/p/13361910.html
iginkgo18
2020/09/27
4260
震惊!StructuredStreaming整合Kafka和MySQL原来这么简单?
上一篇博客博主已经为大家从发展史到基本实战为大家详细介绍了StructedStreaming(具体请见:《看了这篇博客,你还敢说不会Structured Streaming?》)。本篇博客,博主将紧随前沿,为大家带来关于StructuredStreaming整合Kafka和MySQL的教程。
大数据梦想家
2021/01/27
8940
震惊!StructuredStreaming整合Kafka和MySQL原来这么简单?
Kafka - 3.x Kafka命令行操作
这些参数是用于操作和管理Apache Kafka主题的命令行工具参数,通常用于kafka-topics.sh工具。以下是每个参数的描述:
小小工匠
2023/10/27
9160
Kafka - 3.x Kafka命令行操作
alpakka-kafka(2)-consumer
alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。在kafka-consumer的实现细节上,为了达到高可用、高吞吐的目的,topic又可用划分出多个分区partition。分区是分布在kafka集群节点broker上的。由于一个topic可能有多个partition,对应topic就会有多个consumer,形成一个consumer组,共用统一的groupid。一个partition只能对应一个consumer、而一个consumer负责从多个partition甚至多个topic读取消息。kafka会根据实际情况将某个partition分配给某个consumer,即partition-assignment。所以一般来说我们会把topic订阅与consumer-group挂钩。这个可以在典型的ConsumerSettings证实:
用户1150956
2021/03/02
7080
Flink SQL Kafka Connector
Kafka Connector 提供了从 Kafka topic 中消费和写入数据的能力。
smartsi
2022/02/22
6K0
Flink SQL Kafka Connector
Python 使用python-kafka类库开发kafka生产者&消费者&客户端
http://zookeeper.apache.org/releases.html#download
授客
2019/09/10
4.6K0
【Kafka】核心API
虚拟化软件推荐 VM https://www.cnblogs.com/PrayzzZ/p/11330937.html VirtualBOX
瑞新
2020/12/07
1.3K0
【Kafka】核心API
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:
用户1148526
2024/03/21
6960
基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步
Kafka 开发实战
其中KafkaProducer是⽤于发送消息的类,ProducerRecord类⽤于封装 Kafka 的消息。
用户7353950
2022/06/23
5340
Kafka 开发实战
ksqlDB基本使用
ksqlDB是事件流数据库,是一种特殊的数据库,基于Kafka的实时数据流处理引擎,提供了强大且易用的SQL交互方式来对Kafka数据流进行处理,而无需编写代码。KSQL具备高扩展、高弹性、容错式等优良特性,并且它提供了大范围的流式处理操作,比如数据过滤、转化、聚合、连接join、窗口化和 Sessionization (即捕获单一会话期间的所有的流事件)等。
Se7en258
2021/05/18
3.7K0
ksqlDB基本使用
相关推荐
如何用MLSQL快速调试Structured Streaming程序
更多 >
领券
一站式MCP教程库,解锁AI应用新玩法
涵盖代码开发、场景应用、自动测试全流程,助你从零构建专属AI助手
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验