前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >ScalaPB(1): using protobuf in akka

ScalaPB(1): using protobuf in akka

作者头像
用户1150956
发布于 2018-05-28 08:40:44
发布于 2018-05-28 08:40:44
1.6K00
代码可运行
举报
运行总次数:0
代码可运行

    任何类型的实例作为消息在两端独立系统的机器之间进行传递时必须经过序列化/反序列化serialize/deserialize处理过程。假设以下场景:在一个网络里有两台连接的服务器,它们分别部署了独立的akka系统。如果我们需要在这两台服务器的akka系统之间进行消息交换的话,所有消息都必须经过序列化/反序列化处理。akka系统对于用户自定义消息类型的默认序列化处理是以java-object serialization 方式进行的。我们上次提过:由于java-object-serialization会把一个java-object的类型信息、实例值、它所包含的其它类型描述信息等都写入序列化的结果里,所以会占据较大空间,传输数据的效率相对就低了。protobuf是binary格式的,基本只包括实例值,所以数据传输效率较高。下面我们就介绍如何在akka系统中使用protobuf序列化。在akka中使用自定义序列化方法包括下面的这些步骤:

1、在.proto文件中对消息类型进行IDL定义

2、用ScalaPB编译IDL文件并产生scala源代码。这些源代码中包括了涉及的消息类型及它们的操作方法

3、在akka程序模块中import产生的classes,然后直接调用这些类型和方法

4、按akka要求编写序列化方法

5、在akka的.conf文件里actor.serializers段落中定义akka的默认serializer

下面的build.sbt文件里描述了程序结构:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

local和remote是两个分开的项目。我们会在这两个项目里分别部署akka系统。注意依赖项中的scalapb.runtime。PB.targets指明了产生源代码的路径。我们还需要在project/scalapb.sbt中指定scalaPB插件: 

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

我们首先在.proto文件里定义消息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

现在我们先在remote项目里定义一个Calculator actor:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

运行CalculatorStarter产生一个calculator actor:  akka.tcp://calcSystem@127.0.0.1:2552/user/calculator

下面我们在local项目里从端口2551上部署另一个akka系统,然后调用端口2552上部署akka系统的calculator actor:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._

class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

这个CalcRunner是一个actor,在程序里首先通过向remote项目中的calculator-actor传送Identify消息以取得具体的ActorRef。然后用这个ActorRef与calculator-actor进行交互。这其中Identify是akka预定消息类型,其它消息都是ScalaPB从.proto文件中产生的。下面是local项目的运算程序:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")

  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

配置文件application.conf:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
akka {

  actor {
    provider = remote
  }

  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }

}

先运行remote然后local。注意下面出现的提示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

下面是protobuf类型的序列化方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}

然后我们需要在application.conf中告诉akka系统使用这些方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  actor {

    serializers {

      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }

    serialization-bindings {

      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }

现在再重新运行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] Remote calculator started!
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 18 + 38 = 56
inside toBinary 
inside fromBinarylearn.proto.messages.AddedResult
[INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 22 + 74 = 96

系统使用了自定义的ProtobufferSerializer。

下面是本次示范的完整源代码:

project/scalapb.sbt

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

resources/application.conf

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
akka {
  actor {
    provider = remote
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
  actor {
    serializers {
      proto = "akka.protobuf.serializer.ProtobufSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "com.google.protobuf.Message" = proto
      "learn.proto.messages.Added" = proto
      "learn.proto.messages.AddedResult" = proto
      "learn.proto.messages.Subtracted" = proto
      "learn.proto.messages.SubtractedResult" = proto

    }
  }
}

main/protobuf/messages.proto

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

remote/Calculator.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.calculator
import akka.actor._
import com.typesafe.config.ConfigFactory
import learn.proto.messages._

class Calculator extends Actor with ActorLogging {


  override def receive: Receive = {
    case Added(a,b) =>
      log.info("Calculating %d + %d".format(a, b))
      sender() ! AddedResult(a,b,a+b)
    case Subtracted(a,b) =>
      log.info("Calculating %d - %d".format(a, b))
      sender() ! SubtractedResult(a,b,a-b)
  }

}

object Calculator {
  def props = Props(new Calculator)
}

object CalculatorStarter extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  calcSystem.actorOf(Calculator.props,"calculator")

  println("press any key to end program ...")

  scala.io.StdIn.readLine()

  calcSystem.terminate()

}

CalcService.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.calcservice
import akka.actor._
import learn.proto.messages._
import scala.concurrent.duration._



class CalcRunner(path: String) extends Actor with ActorLogging {
  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying : Receive = {
    case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
      log.info("Remote calculator started!")
      context.watch(calcRef)
      context.become(calculating(calcRef))
    case ActorIdentity(_,None) =>
      log.info("Remote calculator not found!")
    case ReceiveTimeout =>
      sendIdentifyRequest()
    case s @ _ =>
      log.info(s"Remote calculator not ready. [$s]")
  }

  def calculating(calculator: ActorRef) : Receive = {
    case (op : Added) => calculator ! op
    case (op : Subtracted) => calculator ! op

    case AddedResult(a,b,r)  =>
      log.info(s"$a + $b = $r")
    case SubtractedResult(a,b,r) =>
      log.info(s"$a - $b = $r")

    case Terminated(calculator) =>
      log.info("Remote calculator terminated, restarting ...")
      sendIdentifyRequest()
      context.become(identifying)

    case ReceiveTimeout => //nothing
  }

}

object CalcRunner {
  def props(path: String) = Props(new CalcRunner(path))
}

Main.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.demo
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import akka.protobuf.calcservice._

import scala.concurrent.duration._
import scala.util._
import learn.proto.messages._

object Main extends App {

  val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
    .withFallback(ConfigFactory.load())

  val calcSystem = ActorSystem("calcSystem",config)

  val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"

  val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")


  println("Calculator started ...")

  import calcSystem.dispatcher

  calcSystem.scheduler.schedule(1.second, 1.second) {
    if (Random.nextInt(100) % 2 == 0)
      calculator ! Added(Random.nextInt(100), Random.nextInt(100))
    else
      calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
  }


  scala.io.StdIn.readLine()

}

ProtobufferSerializer.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package akka.protobuf.serializer

import akka.serialization.SerializerWithStringManifest
import learn.proto.messages._


class ProtobufSerializer extends SerializerWithStringManifest{

  def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName
  final val AddedManifest = classOf[Added].getName
  final val SubtractedManifest = classOf[Subtracted].getName
  final val AddedResultManifest = classOf[AddedResult].getName
  final val SubtractedResultManifest = classOf[SubtractedResult].getName


  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {

    println("inside fromBinary"+manifest)

    manifest match {
      case AddedManifest => Added.parseFrom(bytes)
      case SubtractedManifest => Subtracted.parseFrom(bytes)
      case AddedResultManifest => AddedResult.parseFrom(bytes)
      case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
    }
  }

  override def toBinary(o: AnyRef): Array[Byte] = {

    println("inside toBinary ")
    o match {
      case a: Added => a.toByteArray
      case s :Subtracted => s.toByteArray
      case aR: AddedResult => aR.toByteArray
      case sR: SubtractedResult => sR.toByteArray
    }
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-04-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Akka-CQRS(9)- gRPC,实现前端设备与平台系统的高效集成
前面我们完成了一个CQRS模式的数据采集(录入)平台。可以预见:数据的产生是在线下各式各样的终端系统中,包括web、桌面、移动终端。那么,为了实现一个完整的系统,必须把前端设备通过某种网络连接形式与数据采集平台集成为一体。有两种方式可以实现需要的网络连接:Restful-api, gRPC。由于gRPC支持http/2通讯协议,支持持久连接方式及双向数据流。所以对于POS设备这样的前端选择gRPC作为网络连接方式来实现实时的操作控制应该是正确的选择,毕竟采用恒久连接和双向数据流效率会高很多。gRPC是google公司的标准,基于protobuffer消息:一种二进制序列化数据交换机制。gRPC的优势在这里就不再细说,读者可以参考前面有关gRPC的讨论博文。
用户1150956
2019/06/24
1.2K0
ScalaPB(4): 通用跨系统protobuf数据,sbt设置
  我们知道,在集群环境节点之间进行交换的数据必须经过序列化/反序列化处理过程,而在这方面protobuf是一个比较高效、易用的模式。用户首先在.proto文件中用IDL来定义系统中各种需要进行交换的数据类型。然后用protoc编译器自动产生相关的源代码,里面包括了完整的序列化处理函数。在一个集成的系统环境内,protobuf数据必须保持与所有系统的松散耦合,不能对这些用户系统有任何依赖。这样把protobuf数据类型和相关的序列化/反序列化函数打成一个独立的包,由用户系统各自引用就是一种最佳解决方案了。
用户1150956
2018/05/28
1.2K0
Akka-Cluster(3)- ClusterClient, 集群客户端
  上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。
用户1150956
2018/12/25
1.9K0
Akka(11): 分布式运算:集群-均衡负载
在上篇讨论里我们主要介绍了Akka-Cluster的基本原理。同时我们也确认了几个使用Akka-Cluster的重点:首先,Akka-Cluster集群构建与Actor编程没有直接的关联。集群构建是
用户1150956
2018/01/05
2.3K0
akka-grpc - 基于akka-http和akka-streams的scala gRPC开发工具
关于grpc,在前面的scalaPB讨论里已经做了详细的介绍:google gRPC是一种全新的RPC框架,在开源前一直是google内部使用的集成工具。gRPC支持通过http/2实现protobuf格式数据交换。protobuf即protocol buffer,是google发明的一套全新的序列化传输协议serialization-protocol,是二进制编码binary-encoded的,相对java-object,XML,Json等在空间上占有优势,所以数据传输效率更高。由于gRPC支持http/2协议,可以实现双向通讯duplex-communication,解决了独立request/response交互模式在软件编程中的诸多局限。这是在系统集成编程方面相对akka-http占优的一个亮点。protobuf格式数据可以很方便的转换成 json格式数据,支持对外部系统的的开放协议数据交换。这也是一些人决定选择gRPC作为大型系统微服务集成开发工具的主要原因。更重要的是:用protobuf和gRPC进行client/server交互不涉及任何http对象包括httprequest,httpresponse,很容易上手使用,而且又有在google等大公司内部的成功使用经验,用起来会更加放心。
用户1150956
2020/08/25
2K0
Akka(9): 分布式运算:Remoting-远程构建式
   上篇我们讨论了Akka-Remoting。我们说Akka-Remoting是一种点对点的通讯方式,能使两个不同JVM上Akka-ActorSystem上的两个Actor之间可以相互沟通。Akka
用户1150956
2018/01/05
7790
Akka(13): 分布式运算:Cluster-Sharding-运算的集群分片
    通过上篇关于Cluster-Singleton的介绍,我们了解了Akka为分布式程序提供的编程支持:基于消息驱动的运算模式特别适合分布式程序编程,我们不需要特别的努力,只需要按照普通的Acto
用户1150956
2018/01/05
1.5K0
Akka-Cluster(2)- distributed pub/sub mechanism 分布式发布/订阅机制
   上期我们介绍了cluster singleton,它的作用是保证在一个集群环境里永远会有唯一一个singleton实例存在。具体使用方式是在集群所有节点部署ClusterSingletonManager,由集群中的leader节点选定其中一个节点并指示上面的ClusterSingletonManager运行一个cluster singleton实例。与singleton实例交互则通过即时构建ClusterSingletonProxy实例当作沟通目标。从应用场景来说cluster singleton应该是某种pull模式的应用:我们把singleton当作中央操作协调,比如说管理一个任务清单,多个ClusterSingletonProxy从任务清单中获取(pull)自己应该执行的任务。如果需要实现push模式的任务派送:即由singleton主动通知集群里某种类型的actor执行任务,那么通过ClusterSingletonProxy沟通就不适用了,使用pub/sub方式是一个可行的解决方案。
用户1150956
2018/12/10
1.3K0
Akka-CQRS(16)- gRPC用JWT进行权限管理
前面谈过gRPC的SSL/TLS安全机制,发现设置过程比较复杂:比如证书签名:需要服务端、客户端两头都设置等。想想实际上用JWT会更加便捷,而且更安全和功能强大,因为除JWT的加密签名之外还可以把私密的用户信息放在JWT里加密后在服务端和客户端之间传递。当然,最基本的是通过对JWT的验证机制可以控制客户端对某些功能的使用权限。
用户1150956
2019/07/19
1.7K0
Akka-CQRS(8)- CQRS Reader Actor 应用实例
前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。
用户1150956
2019/06/24
1.6K0
Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用
前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。 akka是什么? akka的作用 akka的名字是action kernel的回文。根据官方定义:akka用于r
绿巨人
2018/05/18
1.2K0
Akka(14): 持久化模式:PersistentActor
    Akka程序的特点之一就是高弹性或者强韧性(resilient)的,因为Actor具有自我修复的能力。当Actor模式的程序出现中断情况如:系统崩溃、人为终结等,系统在重启后有关Actor可以
用户1150956
2018/01/05
1.6K0
akka-grpc - 应用案例
上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。不过有些项目需求不等人,需要使用这项技术,所以研究了一下akka-grpc,写了一篇介绍。本想到此为止,继续其它项目。想想这样做法有点不负责任,像是草草收场。毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。
用户1150956
2020/09/01
9030
Akka(15): 持久化模式:AtLeastOnceDelivery-消息保证送达模式
  消息保证送达是指消息发送方保证在任何情况下都会至少一次确定的消息送达。AtleastOnceDelivery是一个独立的trait,主要作用是对不确定已送达的消息进行补发,这是一种自动的操作,无需
用户1150956
2018/01/05
1.5K0
Akka(12): 分布式运算:Cluster-Singleton-让运算在集群节点中自动转移
  在很多应用场景中都会出现在系统中需要某类Actor的唯一实例(only instance)。这个实例在集群环境中可能在任何一个节点上,但保证它是唯一的。Akka的Cluster-Singleton
用户1150956
2018/01/05
1.4K0
Akka(8): 分布式运算:Remoting-远程查找式
该文摘要总结:本文介绍了如何利用Akka和Scala实现一个分布式计算系统,用于执行并行计算任务。具体来说,文章介绍了如何利用Akka的Actor模型和Scala的并发编程库来实现一个分布式计算系统,该系统能够执行多个计算任务,并将结果返回给调用者。同时,文章还探讨了如何利用Akka的Identify消息处理Actor的死亡,从而避免Actor的丢失,并提高系统的可用性。
用户1150956
2018/01/05
1.9K0
ScalaPB(5):用akka-stream实现reactive-gRPC
 在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式: 1、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式 2、Server-Streaming:client发出一个request后从server端接收一串多个response 3、Client-Streaming:client向server发送一串多个reques
用户1150956
2018/05/28
1.3K0
Akka-CQRS(5)- CQRS Writer Actor 部署和测试
上篇我们做了一个WriterActor的例子,主要目的是示范WriterActor如何作为集群分片用persistentActor特性及event-sourcing模式实现CQRS的写功能。既然是集群分片,那么我们就在这篇讲讲WriterActor的部署和测试,因为这个里面还是有些值得注意的地方。下面是一段WriteActor,即集群分片(cluster-sharding)的部署代码:
用户1150956
2019/05/25
9990
Akka-Cluster(6)- Cluster-Sharding:集群分片,分布式交互程序核心方式
在前面几篇讨论里我们介绍了在集群环境里的一些编程模式、分布式数据结构及具体实现方式。到目前为止,我们已经实现了把程序任务分配给处于很多服务器上的actor,能够最大程度的利用整体系统的硬件资源。这是因为通过akka-cluster能够把很多服务器组合成一个虚拟的整体系统,编程人员不需要知道负责运算的actor具体在那台服务器上运行。当然,我所指的整体系统是一种分布式的系统,实质底层还是各集群节点作为完整个体独立运行的,所以核心理念还是需要将程序分割成能独立运算的任务,然后分派给可能分布在很多服务器上的actor去运算。在上一篇的cluster-load-balance里我们采用了一种fire-and-forget模式把多项独立任务分配给集群节点上的actor,然后任由它们各自完成运算,中途不做任何交互、控制。这也是一种典型的无内部状态的运算模式。对外界来讲就是开始、完成,中间没有关于运算进展或当前状态的交流需要。但在现实里,很多任务是无法完全进行独立细分的,或者再细分会影响系统效率。比如网上购物网站每个客户的购物车:它记录了客户在网上的所有商品拣选过程,每一个拣选动作都代表更新的购物车状态,直到完成结算。那么在一个可能有几十万用户同时在线购物的网站,保留在内存的购物车状态应该是任何机器都无法容纳的,只有回到传统的数据库模式了,还是要面对无法解决的多并发系统效率问题。这么分析,集群分片技术可能是最好的解决方法了。
用户1150956
2019/05/25
1.5K0
Akka-Cluster(4)- DistributedData, 分布式数据类型
在实际应用中,集群环境里共用一些数据是不可避免的。我的意思是有些数据可以在任何节点进行共享同步读写,困难的是如何解决更改冲突问题。本来可以通过分布式数据库来实现这样的功能,但使用和维护成本又过高,不值得。分布式数据类型distributed-data (ddata)正是为解决这样的困局而设计的。akka提供了一组CRDT(ConflictFreeReplicatedDataType 免冲突可复制数据类型)和一套管理方法来实现分布式数据在集群中的免冲突共享共用。
用户1150956
2018/12/28
7210
推荐阅读
相关推荐
Akka-CQRS(9)- gRPC,实现前端设备与平台系统的高效集成
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验