上一篇我们通过示范案例基本了解了一个独立交易类型的库存管理模块应该是怎样的一块业务。这篇我们讨论一些如何从技术上来实现这样的业务模块。讲确切点应该说如何借助kafka的特性来实现功能开发。
底层方面:多节点服务器集群、kafka分布部署。
对上一层主要关注partition相关的问题:partition的分布与consumer如何对应。根据kafka官方文档:一个topic分出多个partition,一般按照集群节点broker倍数设置。那么一个topic的partition差不多以同等数量分布于每个broker上。或者针对一个topic,每个集群节点上都有多个partition。从consumer配置来讲就是在每个节点上部署同一组(相同consumer-group-id)consumer。所谓consumer就是alpakka-kafka的一个stream。由于最终的完整应用会部署在每一个集群节点,应用中包括了consumer,所以每组consumer已经是分布式的了,不需要分片sharding机制。在每个节点启动应用时就开始运行多个应用里的kafka-conusmer-stream就行了,至于consumer分布式运算是体现在底层kafka的分布式部署上的。
再上一层是库存交易运算层,这部分功能是业务实现核心,包括:库存状态更新、库存流转、交易日志、库存账目等等。我们目前只关心库存状态。但假设这部分完整业务功能是多并发、复杂又消耗资源的,那么应该把它作为分片sharded-entity来设计。这样,这些耗资源的运算可以被分发到各节点上去运算了。还有一个问题需要考虑的:alpakka-kafka提供了一个独特的分片部署策略kafkaSharding,能实现partition与某分片在同一节点对应,这样可以节省消息跨节点传递,把消息读取和业务处理放到同一节点上去完成。不过对我们的案例来说,跨节点消息传递与把庞大的运算均衡的分发到多个节点上去相比较就显得微不足道了。所以,我们否定了使用kafkaSharding的想法。
这个库存管理业务模块应该是独立全封闭的。那么与其它业务模块甚至第三方软件交流就需要按照事先约定的通讯协议进行了,最合适的标准应该是http协议了。在库存管理模块外表构建一层http api,提供与外界的信息互动。这个案例的库存管理会通过api为外界用户提供读、写服务。具体工作场景如下:用户通过任何节点上的http端点用http-request调用api传递指令(读、写库存)-> api把指令写入kafka -> consumer从kafka读出指令传给一个shard-entity -> shard-entity按照指令处理库存数据 -> 通过http-response返回处理结果。
还有一些流程细节需要厘清:业务api的http-request分两大类型:库存查询(读)和库存更新(写)。其中库存更新又分单向和双向(fire-and-forget and request-response)。库存查询不需要kafka,直接发到一个shard-entity上面去查就行了。只有库存处理指令,因为要保证执行顺序,需要先写入kafka,然后consumer按照写入时序读出来交由一个shard-entity去处理。麻烦的是需要返回结果的双向指令,处理完业务后该如何把结果返回正确的http-request,毕竟指令是通过kafka发过去的。如果通过kafka返回结果,前端还需要构建consumer来接收。另一个方案是通过actor方式返回,这需要返回时获取正确的actorRef。这个比较容易实现:建一个管理结果返回请求的actor,把所有未完成请求消息放到一个集合里。请求消息里除提供请求者actorRef之外还必须有个文本类型的messageID,一个代表唯一的字符串。具体流程如下:http接到双向指令后分别构建包含messageID的producerRecord写入kafka、向返回请求管理actor发一条包含replyTo, messageID消息 -> consumer从kafka读取包括业务指令及messageID的消息 -> 把包含messageID的消息传给业务分片shard-entity进行业务处理 -> shard-entity处理业务完毕后向返回请求管理actor发一条包括处理结果及messageID的消息 -> 返回请求管理actor按照messageID从存放请求消息的集合里找到相应的actorRef -> 向actorRef发还结果。整个流程看起来好像又长又复杂,实际用了kafka效率还是很高的。到这已经把全部技术实现各节点都过了一遍,下面我们就可以一块一块分步去实现了。