kafka呢其实正道不是消息队列,本质是日志存储系统,而stream processing是其最近大力推广的特性,本文简单介绍下word count的实例。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
sh kafka-topics.sh --create --topic wc-input --replication-factor 1 --partitions 1 --zookeeper localhost:2181
一个是wc-input,然后输出呢,为了简单方便,这里采用控制台输出,当然也可以输出到另外一个topic等等。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("wc-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, value);
}
})
.groupByKey()
.count("Counts");
counts.print();
KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
e.printStackTrace();
}
sh kafka-console-producer.sh --broker-list localhost:9092 --topic wc-input
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka’s server-side cluster technology.
[KSTREAM-AGGREGATE-0000000003]: streams , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: is , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: a , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: library , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: for , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: building , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: microservices, , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: where , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: input , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: output , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: data , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: are , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: stored , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: in , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: kafka , (2<-null)
[KSTREAM-AGGREGATE-0000000003]: clusters. , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: it , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: combines , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: simplicity , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: writing , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: deploying , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: standard , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: java , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: and , (5<-null)
[KSTREAM-AGGREGATE-0000000003]: scala , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: applications , (2<-null)
[KSTREAM-AGGREGATE-0000000003]: on , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: client , (2<-null)
[KSTREAM-AGGREGATE-0000000003]: side , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: with , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: the , (4<-null)
[KSTREAM-AGGREGATE-0000000003]: benefits , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: of , (2<-null)
[KSTREAM-AGGREGATE-0000000003]: kafka's , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: server-side , (1<-null)
[KSTREAM-AGGREGATE-0000000003]: cluster , (1<-null)
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有