首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka流按两个字段聚合

Kafka流按两个字段聚合
EN

Stack Overflow用户
提问于 2020-05-22 02:24:13
回答 1查看 683关注 0票数 0

我使用kafka流为仓库中的项目创建聚合(sum)。

该项目可以添加(例如从供应商购买)或删除(例如,出售的项目)。

在应用程序中,一个仓库可以服务于多个商店,而公司有几个仓库。

在本例中,我需要使用两个字段对事务进行求和和分组: item name和store name。

使用项目名称(一个字段)进行求和是很简单的,但是如何使用额外的分组(例如,每个商店的每个项目的总库存)或(每个仓库的总库存,每个项目名称)?

我的(过于简化的)代码

InventoryKafkaMessage.java

代码语言:javascript
运行
复制
public class InventoryKafkaMessage {

    private String warehouseId;  // warehouse ID
    private String itemName;  // item name
    private long quantity;  // always positive
    private String type;  // ADD or REMOVE
    private String storeLocation; // store ID
    private long transactionTimestamp;
    // ... some others, but not relevant for this question
}

使用项名作为键发送到源主题的消息。

InventoryAggregatorStream.java

溪流是

代码语言:javascript
运行
复制
        var inventorySerde = new JsonSerde<>(InventoryKafkaMessage.class);
        var sourceStream = builder.stream("supplychain-warehouse-inventory", Consumed.with(Serdes.String(), inventorySerde));

        // aggregating by key (item name)
        logisticStream.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
                .groupByKey()
                .aggregate(() -> 0l, (aggKey, newValue, aggValue) -> aggValue + newValue,
                        Materialized.with(Serdes.String(), Serdes.Long()))
                .toStream().to("stream-supplychain-wharehouse-inventory-total", Produced.with(Serdes.String(), Serdes.Long()));
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-23 22:52:04

要对多个属性进行分组,可以定义包含两个属性的组合类型,并将其设置为键。例如,您可以定义一个类型:

代码语言:javascript
运行
复制
public class GroupingKey {
  private String warehouseId;
  private String itemName;

  public GroupingKey(String warehouseId, String itemName) {
    // set fields
  }
  // etc
}

// usage:

sourceStream = builder.stream("supplychain-warehouse-inventory",
                              Consumed.with(Serdes.String(), inventorySerde));
newKeyStream = sourceStream.selectKey((k, v) -> new GroupingKey(v.warehouseId, v.itemName));

newKeyStream.groupByKey()...
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61946838

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档