前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka入门实战教程(7):Kafka Streams

Kafka入门实战教程(7):Kafka Streams

作者头像
Edison Zhou
发布于 2022-09-09 00:14:19
发布于 2022-09-09 00:14:19
4.3K00
代码可运行
举报
文章被收录于专栏:EdisonTalkEdisonTalk
运行总次数:0
代码可运行

1 关于流处理

流处理平台(Streaming Systems)是处理无限数据集(Unbounded Dataset)的数据处理引擎,而流处理是与批处理(Batch Processing)相对应的。所谓的无线数据,指的是数据永远没有尽头。而流处理平台就是专门处理这种数据集的系统或框架。下图生动形象地展示了流处理和批处理的区别:

总体来说,流处理给人的印象是低延时,但是结果可能不太精确。而批处理则相反,它能提供精确的结果,但是往往存在高时延。

一个最简单的Streaming的结构如下图所示:

从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,嗯,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。

那么,我们再在上面的结构之上扩展一下,假设定义了多个Source Topic及Destination Topic,那就构成如下图所示的较为复杂的拓扑结构:

2 关于Kafka Streams

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache Samza、Apache Storm,以及这些年火爆的 Spark 以及 Flink 等。

Kafka Streams的特点

相比于其他流处理平台,Kafka Streams 最大的特色就是它不是一个平台,至少它不是一个具备完整功能(Full-Fledged)的平台,比如其他框架中自带的调度器和资源管理器,就是 Kafka Streams 不提供的。Kafka 官网明确定义 Kafka Streams 是一个客户端库(Client Library)。我们可以使用这个库来构建高伸缩性、高弹性、高容错性的分布式应用以及微服务。使用Kafka Streams API构建的应用程序就是一个普通的应用程序,我们可以选择任何熟悉的技术或框架对其进行编译、打包、部署和上线。很不幸,目前Kafka Streams还没有在除了Java之外的其他主流开发语言的SDK上提供。Kafka Streams最大的特点就是,对于上下游数据源的限定。目前Kafka Streams只支持与Kafka集群进行交互,它并没有提供开箱即用的外部数据源连接器。

Kafka Streams应用执行

Kafka Streams宣称自己实现了精确一次处理语义(Exactly Once Semantics, EOS,以下使用EOS简称),所谓EOS,是指消息或事件对应用状态的影响有且只有一次。其实,对于Kafka Streams而言,它天然支持端到端的EOS,因为它本来就是和Kafka紧密相连的。下图展示了一个典型的Kafka Streams应用的执行逻辑:

通常情况下,一个 Kafka Streams 需要执行 5 个步骤:

  • 读取最新处理的消息位移;
  • 读取消息数据;
  • 执行处理逻辑;
  • 将处理结果写回到 Kafka;
  • 保存位置信息。

这五步的执行必须是原子性的,否则无法实现精确一次处理语义。而在设计上,Kafka Streams在底层大量使用了Kafka事务机制和幂等性Producer来实现多分区的写入,又因为它只能读写Kafka,因此Kafka Streams很easy地就实现了端到端的EOS。

3 Kafka Streams客户端

目前.NET圈主流的Kafka客户端Confluent.Kafka并没有提供Streams的功能,其实,目前Kafka Streams也只在Java客户端提供了Streams功能,其他语言均没有提供。

画外音:毕竟Kafka是JVM系语言写的(Scala+Java),Java就是嫡系,一等公民。

那么,Confluent.Kafka团队有没有计划提供这个功能呢?我在issue列表找到了一些comments,得到的结果是目前没有这个计划,它涉及到太多的工作量,WTF。那么,.NET就真的没有可以用的Kafka Streams客户端了么?实际上,有的,我在Confluent.Kafka的issue内容中找到了下面这个Kafka Streams客户端:Streamiz.Kafka.Net。

Streamiz.Kafka.Net:https://github.com/LGouellec/kafka-streams-dotnet

目前Streamiz.Kafka.Net这个项目仍然属于一个不断开发完善的阶段,Star数量278个,生产环境估计无法直接使用,但是拿来学习实践还是可以的,目前最新版本:1.3.0。其实,Streamiz.Kafka.Net也是基于Confluent.Kafka开发的,相当于对Confluent.Kafka做了一些DSL扩展。它的接口名字与用法,和Java API几乎一致。

4 第一个Streaming应用

如果你对Streaming的概念还不了解,建议先阅读上一篇文章。

应用程序部分

首先,创建一个.NET Core或.NET 5/6的控制台应用程序。

然后,通过Nuget安装Streamiz.Kafka.Net包:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
PM>Install-Package Streamiz.Kafka.Net

然后,开始编写第一个Streaming应用程序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
using Streamiz.Kafka.Net.Table;
using System;
using System.Threading.Tasks;

namespace EDT.Kafka.Streams.Demo
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            // Stream configuration
            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "test-streams-app";
            config.BootstrapServers = "kafka1:9091,kafka2:9092,kafka3:9093";

            StreamBuilder builder = new StreamBuilder();

            // Stream "test-stream-input" topic with filterNot condition and persist in "test-stream-output" topic.
            builder.Stream<string, string>("test-stream-input")
                .FilterNot((k, v) => v.Contains("test"))
                .To("test-stream-output");

            // Create a table with "test-ktable" topic, and materialize this with in memory store named "test-store"
            builder.Table("test-stream-ktable", InMemory<string, string>.As("test-stream-store"));

            // Build topology
            Topology t = builder.Build();

            // Create a stream instance with toology and configuration
            KafkaStream stream = new KafkaStream(t, config);

            // Subscribe CTRL + C to quit stream application
            Console.CancelKeyPress += (o, e) =>
            {
                stream.Dispose();
            };

            // Start stream instance with cancellable token
            await stream.StartAsync();
        }
    }
}

这个示例Streaming应用程序很简单,它实现的就是一个如下图所示的最简单的处理流程:

Source Topic是test-stream-input,Destination Topic是test-stream-output,分别对应输入源 和 输出地。在处理过程中会创建一个Table,名为test-stream-ktable,它会作为输入流和输出流的中间状态。在Kafka Streams中,流在时间维度上聚合成表,而表在时间维度上不断更新成流。换句话说,表会转换成流,流又再转换成表,如此反复,完成所谓的Streaming流式计算。

这个test-stream-ktable会存储在内存中一个名为test-stream-kstore的区域,我们理解到这里就够了。最后,回到最关键的一句代码,如下所示。在对输入源进行处理时,使用了一个DSL进行快速的过滤,即判断输入的消息是否包含test这个字符串,包含就不做过滤处理,不包含则进行处理,即传递给test-stream-output。

最后,回到最关键的一句代码,如下所示。在对输入源进行处理时,使用了一个DSL进行快速的过滤,即判断输入的消息是否包含test这个字符串,包含就不做过滤处理,不包含则进行处理,即传递给test-stream-output。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
builder.Stream<string, string>("test-stream-input")
   .FilterNot((k, v) => v.Contains("test"))
   .To("test-stream-output");

Broker部分

为了完成这个demo,我们提前在Kafka Broker端创建几个如下图红线框中的topic。

为了方便演示验证,我们暂且都给他们设置为单个分区,无额外副本。

测试效果

首先,我们将.NET控制台程序启动起来。

然后,我们在Broker端打开一个Producer命令行,陆续手动输入一些数据源:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# kafka-console-producer.sh --topic=test-stream-input --broker-list kafka1:9091,kafka2:9092,kafka3:9093
>haha
>test112321
>test123214214
>tesst^H^Ht
>test9898
>xifejlrkewl
>xjkfldsjoifdsfjods
>xjoijfosifjlkdsjflkds
>xjofdksjfljdslkfdsj
>xjlfjdslkjdslfjds
>xjlkdjflksjdlfks
>hello
>helloworld

 可以看到,输入的数据源中包含了3个含有test关键词的字符串消息。期望的结果是,在Streams应用程序处理逻辑中,过滤掉这3个,将其余的消息都进行处理传递到output中。

然后,我们就可以通过Kafka Tool去看看input和output这两个topic的数据验证一下了:

(1)test-stream-input

(2)test-stream-output

可以看到,test-stream-output中未包含含有test关键词的消息,第一个Streaming应用程序运行成功。

5 经典WordCount应用

所谓wordcount就是一个经典的单词计数的应用程序,它可以统计在指定数据源中每个单词出现的次数。在Streaming流式计算和MapReduce分布式计算中,它经常出现在示例代码中。

应用程序部分

改写一下上面的demo实例代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-wordcount-app";
config.BootstrapServers = "kafka1:9091,kafka2:9092,kafka3:9093";

StreamBuilder builder = new StreamBuilder();

builder.Stream<string, string>("test-word-in")
    .FlatMapValues(value => value.Split(" ", StringSplitOptions.RemoveEmptyEntries).ToList()) // 根据空格分隔多个单词
    .Map((key, value) => KeyValuePair.Create(value, "1")) // 转换为(单词, 1)的键值对形式
    .GroupByKey() // 根据单词分组
    .Count() // 计算各个分组value的数量
    .ToStream()
    .Map((key, value) => KeyValuePair.Create(key, $"{key} : {value.ToString()}"))
    .To("test-word-out");

// Create a table with "test-ktable" topic, and materialize this with in memory store named "test-store"
builder.Table("test-word-ktable", InMemory<string, string>.As("test-word-store"));

// Build topology
Topology t = builder.Build();

// Create a stream instance with toology and configuration
KafkaStream stream = new KafkaStream(t, config);

// Subscribe CTRL + C to quit stream application
Console.CancelKeyPress += (o, e) =>
{
    stream.Dispose();
};

// Start stream instance with cancellable token
await stream.StartAsync();

Broker端部分

新增几个示例代码需要用到的topic:test-word-in, test-word-out 以及 test-word-ktable。

测试效果

首先,我们将.NET控制台程序启动起来。

然后,我们在Broker端打开一个Producer命令行,陆续手动输入一些数据源:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# kafka-console-producer.sh --topic=test-word-in --broker-list kafka1:9091,kafka2:9092,kafka3:9093
>hello world
>hello jav^H
>hello csharp
>hello golang

可以看到,这里我们的hello出现了4次,其他单词均只出现了1次。

那么,我们可以直接去test-word-out这个topic中验证一下:

6 总结

 本文总结了Kafka Streams的基本概念与执行流程,并结合.NET客户端给出了一个Kafka Streams应用程序的示例。

参考资料

kafka-streams-dotnet:https://lgouellec.github.io/kafka-streams-dotnet

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

作者:周旭龙

出处:https://edisonchou.cnblogs.com

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-07-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
浅谈ICP备案、网站备案、域名备案、ICP经营许可证~
如果你在中国境内搭建过网站,一定经历过网站备案,那么为什么要进行网站备案,它是必须的吗?ICP备案、网站备案、域名备案之间又有什么关系呢?ICP经营许可证又是个什么东西?下面为你逐个解答~
小晟
2023/04/11
37.5K1
ICP备案与ICP许可证有什么区别?怎么办理ICP许可证?
现在网站已成为各个行业公司对外形象展示、信息发布、业务拓展、客户服务、内部沟通和产品售卖的重要桥梁,想要开设网站进行商品的售卖就需要进行ICP许可证的办理,下面就和小编一起来看看该业务办理的相关内容。
腾讯云DNSPod团队
2022/04/02
6.4K0
ICP备案与ICP许可证有什么区别?怎么办理ICP许可证?
腾讯云服务器备案全流程 40天备案的血与泪
本文原创首发CSDN,链接 https://blog.csdn.net/qq_41464123/article/details/105837646 ,作者博客https://blog.csdn.net/qq_41464123 ,转载请带上本段内容,尤其是脚本之家、码神岛等平台,谢谢配合。----
Designer 小郑
2023/08/01
2.8K0
腾讯云服务器备案全流程 40天备案的血与泪
网站公安备案流程及详细操作说明最新版
根据《中华人民共和国计算机信息系统安全保护条例》以及《计算机信息网络国际联网安全保护管理办法》的相关规定,开办网站必须到当地公安机关网安部门办理备案手续(网上办理),并且在自网络正式联通后的三十日内办理,如果不履行,公安机关会给予警告或者停机整顿不超过六个月的处罚。
众森企服
2024/04/10
7.1K0
网站公安备案流程及详细操作说明最新版
ICP经营许可证怎么办理?
​这是一篇最全面最权威的关于ICP经营许可证怎么办理的文章,基本上汇聚了知乎ICP经营许可证的办理答案的所有精华!先收藏起来,记得点个赞哦~
众森企服
2024/06/14
7740
ICP经营许可证怎么办理?
全网最全面最详细关于ICP许可证办理指南
在这个网络的时代,几乎每个企业都有自己的门户网站。上线一个网站很容易,开发好上线就可以,但并不是这样就完了。后面还有许多事情要做,比如需要办理各类许可资质。
众森企服
2022/04/03
11.8K0
全网最全面最详细关于ICP许可证办理指南
icp备案和域名备案 转
域名备案是指强大的Z-F为了监管互联网站点创新发明出的一个做法。简单地说就是把网站站长的个人或单位的真实资料,包括身份证复印件、填写网站登记表等等资料提交给你的主机服务商,主机服务商再提交给工信部审核登记备案。备案以后会给你发一串类似于“京ICP证030173号 ”的东东,这个串必须挂在你的网站页面里面(很多网站你都能看到类似这样的串)。只有通过备案后的网站才允许开放访问。
wuweixiang
2018/08/14
41.2K0
icp备案和域名备案
                                                                            转
什么是ICP、ICP证、ICP备案 又有什么关系?
  CP是什么意思呢?相信大多站长并不清楚ICP是什么意思,就算是网站已经备案的也不了解ICP到底为何物。大家往往搞不清楚ICP备案和ICP证之间的区别。今天小编为大家讲解一下ICP是什么意思,ICP
苏子
2019/02/27
25.8K2
什么是ICP、ICP证、ICP备案 又有什么关系?
使用腾讯云建站 CloudPages 快速构建您的企业网站
写这篇文章之前本来想写个几分钟快速搭建网站了,但思来想去那根本不切实际,怎么也要一两个小时才能做一个完整网站。其实严格来讲,一个网站上线需要半个月左右,其中 1 天是需要你自己操作的(完成网站的搭建、优化、提交备案申请),剩下的两周都是再等待备案审核的。
Im小泽
2023/02/28
4.1K0
说说网站域名备案这个事儿
在站长圈子里说起“备案”这个事儿几乎是哀鸿一片,甚至都可以用“怨声载道”来形容了。备案真的有那么可怕吗?最近,明月给几个站长做服务器代运维的时候发现不少新手站长对“备案”的认知都是道听途说的,并且还是以讹传讹,几乎已经是“备案猛如虎”的感觉了都!明月感觉很有必要专门给大家普及和讲解一下网站域名备案这个事儿。
明月登楼的博客
2019/08/18
14.7K6
详细!完成备案需要多久时间?国内网站备案流程与步骤
  上一篇文章主要讲解了WordPress主题的设置,距离完成一个完整的个人网站搭建还差最后一步。
IT学习日记
2022/09/13
16.9K1
详细!完成备案需要多久时间?国内网站备案流程与步骤
腾讯云域名备案流程及方式
个人性质备案域名注册者应为本人,单位性质备案域名注册者应为单位(含公司股东)、单位主要负责人或高级管理人员。
梦屿
2023/03/31
25.6K0
官方推荐 | 《2分钟带你认识腾讯云网站备案产品》
关注腾讯云大学,了解最新行业技术动态  戳【阅读原文】查看55个腾讯云产品全集 一、课程概述 根据《互联网信息服务管理办法》的规定,使用中国大陆境内服务器开办网站,必须完成ICP备案,并取得工信部备案号。通过腾讯云备案系统及备案小程序,您可以快速完成备案信息填写及材料上传,并完成电子化核验。这个2分钟的小视频将带你快速了解网站备案的流程,以及腾讯云网站备案产品。 【课程目标】 了解网站备案的流程 了解腾讯云网站备案的产品。 【适用对象】 商务、学生、开发、个人开发者、运维、测试 【课程大纲】 知识模块 简
腾讯产业互联网学堂1
2023/05/29
9030
官方推荐 | 《2分钟带你认识腾讯云网站备案产品》
网站备案说明[通俗易懂]
按照工信部《非经营性互联网信息服务备案管理办法》的规定,企业或个人在中国国内开设网站的话,需要进行网站备案。很多个人在开设网站的
全栈程序员站长
2022/11/16
20K0
建立个网站的流程是什么?
作为一个老博客站长,今天给新手们讲讲如何建立一个自己的博客网站以及基本的流程,这些对于中小型企业网站的建立也是同样可以参考的。今天不讲思路,不讲观点,仅仅是结合当前国内法律法规给大家分享一下网站建立的基本步骤和流程,给新手们一个参考!
爱游博客
2019/08/06
9.9K0
【腾讯云】关于开展移动互联网应用程序备案工作的通知
为贯彻落实《中华人民共和国反电信网络诈骗法》、《互联网信息服务管理办法》、《非经营性互联网信息服务备案管理办法》等法律法规要求,将配合相关部门做好移动互联网信息服务管理工作,根据工业和信息化部2023年8月4日正式颁布的《工业和信息化部关于开展移动互联网应用程序备案工作的通知》,腾讯云预计将于2023年9月1日起为APP主办者提供代备案服务。
腾讯云计算产品团队
2023/08/25
4160
【腾讯云】关于开展移动互联网应用程序备案工作的通知
腾讯云备案域名要求
腾讯云网站备案域名要求都是按照管局的规则来的。大家在腾讯云备案域名时要先看清楚了下列规则,然后在框架内部去曹组,这样才能顺利通过审核,拿到备案号。
魏艾斯博客www.vpsss.net
2019/11/04
37.4K0
腾讯云备案域名要求
企业用户使用备案资源包进行网站备案ICP并开通微信H5支付(附API V3版本支付nodejs代码)
注意:若是进行交易平台或游戏等其他经营项目,还会需要其他资质文件,请查阅相关法律法规,在这里不进行赘述。
爱去西
2023/04/06
5.3K1
企业用户使用备案资源包进行网站备案ICP并开通微信H5支付(附API V3版本支付nodejs代码)
腾讯云网站备案咨询:备案驳回问题汇总解答
在网站备案期间难免会遇到备案不符合要求,被驳回的情况。老魏在本文中总结了备案驳回的一些情况和解决办法,汇总在下面,供大家取用。
魏艾斯博客www.vpsss.net
2019/10/28
15.8K0
腾讯云网站备案咨询:备案驳回问题汇总解答
[重要新闻]小程序要开始施行备案,提前来看如何操作指引
为落实《中华人民共和国反电信网络诈骗法》《互联网信息服务管理办法》(国务院令第292号)等法律法规要求,促进互联网行业规范健康发展,进一步做好移动互联网信息服务管理,现组织开展移动互联网应用程序(含小程序、快应用等分发以下简称APP)备案工作
公众号iOS逆向
2023/08/18
2.3K0
[重要新闻]小程序要开始施行备案,提前来看如何操作指引
推荐阅读
相关推荐
浅谈ICP备案、网站备案、域名备案、ICP经营许可证~
更多 >
LV.2
杭州众森企服科技有限公司总经理
目录
  • 1 关于流处理
  • 2 关于Kafka Streams
    • Kafka Streams的特点
    • Kafka Streams应用执行
  • 3 Kafka Streams客户端
  • 4 第一个Streaming应用
    • 应用程序部分
    • Broker部分
    • 测试效果
  • 5 经典WordCount应用
    • 应用程序部分
    • Broker端部分
    • 测试效果
  • 6 总结
  • 参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档