前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实习培训考核内容--Akka+Netty编写聊天室系统

实习培训考核内容--Akka+Netty编写聊天室系统

原创
作者头像
Coding-man
修改2024-08-19 18:35:29
1081
修改2024-08-19 18:35:29
举报
文章被收录于专栏:自我记录

前言:akka是一种基于Actor 模型,提供了一个在 JVM 上构建高并发、分布式和高容错应用程序的平台。框架资料较少,主要参考资料:akka官网文档:https://doc.akka.io/docs/akka/current/actors.html netty作为 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架。

1、聊天室整体框架

聊天室demo较为简单,主要作为学习akka框架练手比较合适,可以帮助理清akka框架的逻辑与一些使用规则。本人在实习中主要使用单节点actor与集群actor进行了聊天室demo的编写,单节点较为简单,这里不做展示。同时由于公司主要使用kotlin语言进行开发,所以主要使用kotlin进行编写。

聊天室demo整体框架
聊天室demo整体框架

2、主要内容

2.1客户端与服务端模拟

客户端与服务端都是使用netty框架,客户端模拟用户的登录,服务端作为消息的转发,发送到akka集群中的分片区域的节点。 注意:这里netty没有添加心跳机制,同时注意需要考虑TCP粘包问题,进行tcp消息头与消息体的划分,否则在用户输入发送消息之后会产生粘包。同时在不同节点之间传输需要对传输的数据进行序列化,这里直接使用string编解码器,也可以使用protobuf进行自定义编解码器(推荐)。

Client

代码语言:java
复制
object Client {
    @JvmStatic
    fun main(args: Array<String>) {
        val bootstrap = Bootstrap()
        bootstrap
            .group(NioEventLoopGroup())
            .channel(NioSocketChannel::class.java) //注意客户端与服务端在这里的区别
            .handler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
                    channel.pipeline()
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                            override fun channelActive(ctx: ChannelHandlerContext) {
                                println("与服务端链接已建立")
                                println("-------------------------------------------")
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")

                            }
                            override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                                val currentTime = LocalDateTime.now()
                                val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
                                val formattedTime = currentTime.format(formatter)
                                print("\r")  // 回车符,回到行首
                                print("                                       \r")  // 清空当前行
                                println("---------------$formattedTime---------------")
                                println((msg as String).trim())
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")  // 重新显示输入提示
                            }
                        })
                }
            })
            .option(ChannelOption.SO_KEEPALIVE, true) //设置长连接
        val channel = bootstrap.connect("localhost", 8080).sync().channel()
        /**
         * 如果私聊用户则需要在发送消息之前加上:@用户名
         * */
        val messageThread = Thread{
            val scanner = Scanner(System.`in`)  //模拟用户输入数据进行发送
            try {
                while (scanner.hasNextLine()) {
                    print("发送聊天消息(如果需要私聊用户则使用@用户名):")
                    val message = scanner.nextLine()
                    channel.writeAndFlush(message).sync()
                }
            }catch (e:InterruptedException){
                e.printStackTrace()
            }finally {
                scanner.close()
            }
        }
        messageThread.start()
        channel.closeFuture().sync()
    }
}


Server

代码语言:javascript
复制
object Server {
    @JvmStatic
    fun main(args: Array<String>) {
        val actorSystem = ActorSystem.create("ChatServerKt")
        val roomActor = actorSystem.actorOf(Props.create(RoomActor::class.java), "RoomActor")
        var UserList:MutableList<Channel> = mutableListOf() //保存创建的用户
        val bossGroup: EventLoopGroup = NioEventLoopGroup()
        val workerGroup: EventLoopGroup = NioEventLoopGroup()
        val bootstrap = ServerBootstrap()
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel::class.java)
            .childHandler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
       //             channel.pipeline().addLast("IdleStateHandler", IdleStateHandler(0,10,0, TimeUnit.SECONDS));
                    channel.pipeline()
    //                    .addLast(LineBasedFrameDecoder(8192)) // 添加行解码器
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                        override fun channelActive(ctx: ChannelHandlerContext) {
                            UserList.add(ctx.channel())
                            val num:Int = UserList.indexOf(ctx.channel())
                            val user = User("user${UserList.indexOf(ctx.channel())}",ctx.channel())
                            roomActor.tell(user, ActorRef.noSender())  //创建用户
                            println("user${num}连接")
                            println("当前用户数量为:${UserList.size}")
                        }
                        override fun channelInactive(ctx: ChannelHandlerContext) {
                            println("user${UserList.indexOf(ctx.channel())}退出")
                            UserList.remove(ctx.channel())
                        }
                        override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                            val msgstr = (msg as String).trim()
                            if(msgstr.startsWith("@")){ //@user1你好
                                val tonum = msgstr[5]-'0'
                                val fromnum:Int = UserList.indexOf(ctx.channel())
                                if(tonum==fromnum){
                                    println("不能向自己发送消息!")
                                }else{
                                    if(tonum > UserList.size){
                                        println("目标用户不存在!")
                                    }else{
                                        val tomsg:String = msgstr.substring(6)
                                        val toMsg = ToUserMsg("user$fromnum","user$tonum",tomsg,UserList[tonum])
                                        roomActor.tell(toMsg, ActorRef.noSender())
                                    }
                                }
                            }else{
                                println("服务端收到消息:$msgstr")
                                val num:Int = UserList.indexOf(ctx.channel())
                                val message = MsgEntity("user${num}",msg as String,ctx.channel())
                                roomActor.tell(message, ActorRef.noSender()) //向Router
                            }
                        }
                    })
                }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true)
        bootstrap.bind(8080)

    }
}

2.2 AKKA编写用户节点以及分区规则 首先编写一个UserActor来接收不同类型的消息,每次有新用户注册登录相当于集群会启动创建一个UserActor

代码语言:javascript
复制
class UserActor : AbstractPersistentActor() {
    private val log: LoggingAdapter = Logging.getLogger(context.system, this)
//    private val channelGroup: ChannelGroup = DefaultChannelGroup(GlobalEventExecutor.INSTANCE)

    private val mediator = DistributedPubSub.get(context.system).mediator()

    override fun persistenceId(): String {
        return "User-" + self.path().name()
    }
    override fun preStart() {
        mediator.tell(DistributedPubSubMediator.Subscribe("PublicMsg", self), self)
        mediator.tell(DistributedPubSubMediator.Subscribe(self.path().name(), self), self)
        //mediator.tell(DistributedPubSubMediator.Put(getSelf()), getSelf());
        println("节点启动")
        println("Actor路径: ${self.path()}")
        println("Actor名称: ${self.path().name()}")
    }
    override fun createReceiveRecover(): Receive {
        return receiveBuilder()
            .match(PublicMsg::class.java){
                println("UserActor重启成功")
            }.build()
    }
    fun getTime():String{
        val currentTime = LocalDateTime.now()
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
        val formattedTime = currentTime.format(formatter)
        return formattedTime
    }
    override fun createReceive(): Receive {
        return receiveBuilder()
            .match(PreWarmMessage::class.java){
                _->
                println("已经可以正常发送消息!")
            }
            .match(PublicMsg::class.java) { msg ->
                println("---------------聊天室消息-${getTime()}---------------")
                println("用户 ${msg.userId} : ${msg.msg}")
            }
            .match(PrivateMsg::class.java) { msg ->
                println("---------------私聊消息-${getTime()}---------------")
                println("来自用户 ${msg.fromUser}的私聊消息 : ${msg.msg}")
            }
            .match(DistributedPubSubMediator.SubscribeAck::class.java) {
                _->
                println("收到订阅消息")
            }
            .build()
    }
    companion object {
        fun props(): Props {
            return Props.create(UserActor::class.java)
        }
    }
}

使用AKKA框架的经典集群分片,需要编写一个类来集成ShardRegion类来设定分片规则,一般会把消息实体中封装的用户ID作为分片的实体ID,用户ID进行哈希作为分区ID。(消息实体类需自己定义)

代码语言:javascript
复制
class ShardExtractor:ShardRegion.MessageExtractor {
    override fun entityId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> message.shardId+""
            is PrivateMsg -> message.toUser!!+""
       //     is PublicMsg -> message.userId.toString() + ""
            is ShardRegion.StartEntity -> message.entityId()
            is DistributedPubSubMediator.SubscribeAck -> "subscribe-ack-entity" // 添加处理 SubscribeAck 的逻辑
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun shardId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> (message.shardId.toString().hashCode() % 10).toString() + ""
            is PrivateMsg -> (message.toUser!!.toString().hashCode() % 10).toString() + ""
    //        is PublicMsg -> (message.userId.toString().hashCode() % 10).toString() + ""
            is ShardRegion.StartEntity -> (message.entityId().hashCode() % 10).toString()
            is DistributedPubSubMediator.SubscribeAck -> "0"
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun entityMessage(message: Any?): Any {
        return message!!
    }
}

2.3节点的配置 这边我图方便就起了两个节点模拟两个用户的登录

代码语言:javascript
复制
akka {
  actor {
    provider = "cluster"
    allow-java-serialization = on
  }
  remote {
    artery {
      enabled = on
      transport = tcp
      canonical.hostname = "127.0.0.1"
      canonical.port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551"
      "akka://ClusterSystem@127.0.0.1:2552"
    ]
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
  cluster.sharding {
    remember-entities = on
    state-store-mode = persistence
  }
    persistence {
      journal.plugin = "akka.persistence.journal.inmem"  # 使用内存中的持久化插件,只适用于测试
      snapshot-store.plugin = "akka.persistence.snapshot-store.local"  # 使用本地文件系统快照存储
      snapshot-store.local.dir = "target/snapshots/node1"  # 快照存储路径
    }
}

3、补充

netty在进行消息传输时,服务端收到消息才会创建useractor节点,所以在两个用户消息发送之前,需要在用户登陆成功之后自动向Server发送一个预热消息进行节点的启动与创建

代码语言:javascript
复制
data class PreWarmMessage(val shardId: String) : Serializable

其余细节性的内容没有过多展示,新手代码编写可能较为冗余,文章只是作为自己学习的记录,可能没有太大的参考意义,所以希望大佬们嘴下留情

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、聊天室整体框架
    • 聊天室demo较为简单,主要作为学习akka框架练手比较合适,可以帮助理清akka框架的逻辑与一些使用规则。本人在实习中主要使用单节点actor与集群actor进行了聊天室demo的编写,单节点较为简单,这里不做展示。同时由于公司主要使用kotlin语言进行开发,所以主要使用kotlin进行编写。
    • 2、主要内容
      • 2.1客户端与服务端模拟
        • 客户端与服务端都是使用netty框架,客户端模拟用户的登录,服务端作为消息的转发,发送到akka集群中的分片区域的节点。 注意:这里netty没有添加心跳机制,同时注意需要考虑TCP粘包问题,进行tcp消息头与消息体的划分,否则在用户输入发送消息之后会产生粘包。同时在不同节点之间传输需要对传输的数据进行序列化,这里直接使用string编解码器,也可以使用protobuf进行自定义编解码器(推荐)。
          • Client
            • Server
              • 2.2 AKKA编写用户节点以及分区规则 首先编写一个UserActor来接收不同类型的消息,每次有新用户注册登录相当于集群会启动创建一个UserActor
                • 使用AKKA框架的经典集群分片,需要编写一个类来集成ShardRegion类来设定分片规则,一般会把消息实体中封装的用户ID作为分片的实体ID,用户ID进行哈希作为分区ID。(消息实体类需自己定义)
                  • 2.3节点的配置 这边我图方便就起了两个节点模拟两个用户的登录
                  • 3、补充
                    • netty在进行消息传输时,服务端收到消息才会创建useractor节点,所以在两个用户消息发送之前,需要在用户登陆成功之后自动向Server发送一个预热消息进行节点的启动与创建
                    • 其余细节性的内容没有过多展示,新手代码编写可能较为冗余,文章只是作为自己学习的记录,可能没有太大的参考意义,所以希望大佬们嘴下留情
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档