前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >alpakka-kafka(9)-kafka在分布式运算中的应用

alpakka-kafka(9)-kafka在分布式运算中的应用

作者头像
用户1150956
发布2022-05-10 13:07:53
3280
发布2022-05-10 13:07:53
举报
文章被收录于专栏:函数式编程语言及工具

  kafka具备的分布式、高吞吐、高可用特性,以及所提供的各种消息消费模式可以保证在一个多节点集群环境里消息被消费的安全性:即防止每条消息遗漏处理或重复消费。特别是exactly-once消费策略:可以保证每条消息肯定只被消费一次。换句话说就是在分布式运算环境里kafka的消息消费是能保证唯一性的。

但是,保证了消息读取唯一性,消息的处理过程如果也放到分布式运算环境里仍然会面对数据完整性(data integrity)问题。例如:消息处理过程是更新银行账户中金额、消息内容是更新某个账户的指令,那么,对多条针对同一个银行账户的消息进行并行处理时肯定会引发数据完整性问题。这就是本文重点讨论的问题。

我们来看看下面的代码:

代码语言:javascript
复制
kfkSource
        .async.mapAsync(parallelism=8) { msg => updateAccount(msg.value() }
          .toMat(Sink.fold(0) { (accu, e) => if (e) accu + 1 else accu })(Keep.right) 
          .run()

在上面的例子里,从kafka队列里逐一读取的消息可能有多个被并行处理(最多有8个并行线程parallelism=8), 如果这8条消息里包含相同的账户号码,肯定会产生数据完整性问题。那么如果:

代码语言:javascript
复制
> kfkSource
        .async.mapAsync(parallelism=1) { msg => updateAccount(msg.value() }
          .toMat(Sink.fold(0) { (accu, e) => if (e) accu + 1 else accu })(Keep.right) 
          .run()

用(parallelism=1),这样每条消息用单一线程处理,牺牲一些效率,能解决问题吗?答案是:在这台服务器上貌似可以。但我们的目的是在一个多节点集群环境里进行数据处理。这也应该是我们使用kafka的初衷嘛。在分布式环境里上面的这段代码等于是在多个节点上同时运行,同样会产生像多线程并行运算所产生的问题。 显然:问题的核心是重复的消息内容,在上面的例子里是多条消息里相同的银行账号。如果相同的账号在同一个线程里进行处理就可以避免以上问题了。akka actor信箱里的指令是按序逐个执行的,所以我们如果能保证把相同内容的消息发给同一个actor就可以解决问题了。为了实现有目的的向actor发送消息,可以使用集群分片(cluster-sharding)。在akka-cluster里,每一个分片都就等于一个命名的actor。还有一个问题是如果涉及大量的唯一账号,或者商品号,比如超百万的唯一编号又该怎么办呢?刚才讲过:我们只要保证每一种消息发给同一个分片,多种消息是可以发个同一个分片的。所以,对于大量编号我们可以通过hash算法来简化编号精度,如下:

代码语言:javascript
复制
def hashItemCode(code: String): String = {
     val arrCode = code.toCharArray
     var occur : Array[Int] = Array.fill(8)(0)
     arrCode.foreach {
       case x if (x >= '0' && x <= '2') =>
         occur(0) = occur(0) + 1
       case x if (x >= '3' && x <= '5') =>
         occur(1) = occur(1) + 1
       case x if (x >= '6' && x <= '8') =>
         occur(2) = occur(2) + 1
       case x if (x == '9' || x == '-' || x == '_' || x == ':') =>
         occur(3) = occur(3) + 1
       case x if ((x >= 'a' && x <= 'g') || (x >= 'A' && x <= 'G')) =>
         occur(4) = occur(4) + 1
       case x if ((x >= 'h' && x <= 'n') || (x >= 'H' && x <= 'N')) =>
         occur(5) = occur(5) + 1
       case x if ((x >= 'o' && x <= 't') || (x >= 'O' && x <= 'T')) =>
         occur(6) = occur(6) + 1
       case x if ((x >= 'u' && x <= 'z') || (x >= 'U' && x <= 'Z')) =>
         occur(7) = occur(7) + 1
       case _ =>
         occur(7) = occur(7) + 1
     }
     occur.mkString
  }

这个hashItemCode返回一个字串,代表原编码code中各种字母发生的频率,把这个字串作为sharding的entityId。 那么从kafaka读取一条消息后按hashItemCode结果指定发送给某个分片,下面是一个实际例子:

代码语言:javascript
复制
 def toStockWorker(jsonDoc: String) = {
    val bizDoc = fromJson[BizDoc](jsonDoc)
    val plu = bizDoc.pluCode
    val entityId = DocModels.hashItemCode(plu)
    log.step(s"CurStk-toStockWorker: sending CalcStock to ${entityId} with message: $jsonDoc")
    val entityRef = sharding.entityRefFor(StockCalculator.EntityKey, entityId)
    entityRef ! StockCalculator.CalcStock(jsonDoc)
  }

下面我提供一个exactly-once源代码作为参考; 

代码语言:javascript
复制
 (1 to numReaders).toList.map {_ =>
      RestartSource
        .onFailuresWithBackoff(restartSource) { () => mergedSource }
//        .viaMat(KillSwitches.single)(Keep.right)
        .async.mapAsync(1) { msg =>    //only one message uniq checked
        for {                                      //and flow down stream
          newtxn <- curStk.isUniqStkTxns(msg.value())
          _ <- FastFuture.successful {
            log.step(s"ExactlyOnceReaderGroup-futStkTxnExists is ${!newtxn}: ${msg.value()}")
          }
        } yield (newtxn,msg)
      }
        .async.mapAsyncUnordered(8) { rmsg =>  //passed down msg
          for {       //can be parrallelly processed
            cmt <- if (rmsg._1) stkTxns.stkTxnsWithRetry(rmsg._2.value(), rmsg._2.partition(), rmsg._2.offset()).toFuture().map(_ => "Completed")
            else FastFuture.successful {"stktxn exists!"}
            pmsg <- FastFuture.successful {
              log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")
              rmsg
            }
          } yield pmsg
        }
        .async.mapAsyncUnordered(8) { rmsg =>
        for {
          _ <- if(rmsg._1) FastFuture.successful {curStk.toStockWorker(rmsg._2.value())}
          else FastFuture.successful(false)
          pmsg <- FastFuture.successful {
            log.step(s"ExactlyOnceReaderGroup-updateStk...")
            rmsg
          }
        } yield pmsg
      }
        .async.mapAsyncUnordered(8) { rmsg =>
        for {
          _ <- if (rmsg._1) FastFuture.successful {
            pcmTxns.toPcmAggWorker(rmsg._2.value())
          }
          else FastFuture.successful(false)
          pmsg <- FastFuture.successful {
            log.step(s"ExactlyOnceReaderGroup-AccumulatePcm...")
          }
        } yield "Completed"
      }
         .toMat(Sink.seq)(Keep.left)
        .run()
    }
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档