当前位置: 首页 > news >正文

关于akka官方quickstart示例程序(scala)的记录

参考资料

  • https://doc.akka.io/libraries/akka-core/current/typed/actors.html#first-example

关于scala语法的注意事项

  • extends App是个语法糖,等同于直接在伴生对象中编写main 方法
  • 对象是通过apply方法创建的,也可以通过对象的名称单独创建(此时实际上会调用apply方法)
  • case class 样例类用于定义不可变类,可以用于模式匹配
  • trait类似接口但是可以包括抽象方法,具体方法,子类。带有sealed表示只能在定义它的同一个文件中被继承,常用于更加安全的模式匹配,例如如果消息类型为sealed trait,则actor可以安全接受多种消息

helloworld示例

代码的整体示意图如下

hello-world2.png

  1. HelloWorldMain创建ActorSystem,作为一个actorref指向HelloWorldMain actor。使用此引用向HelloWorldMain actor发送SayHello消息
  2. HelloWorldMain actor初始化Helloworld actor和HelloWorldBot,以及在收到SayHello消息后向HelloWorld actor发送Greet消息(其中带有HelloWorldBot的actorref)
  3. HelloWorld actor收到消息后向HelloWorldBot发送Greeted消息
  4. HelloWorldBot actor收到消息后greetingCounter计数增加,并向HelloWorld actor返回Greet消息。当greetingCounter超过max时暂停行为。

代码示例

//#imports
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors//#hello-world-actor
object HelloWorld {// 使用样例类定义消息类型final case class Greet(whom: String, replyTo: ActorRef[Greeted])final case class Greeted(whom: String, from: ActorRef[Greet])// Behaviors.receive函数接收一个函数作为参数,{}是为了容纳多行lambda表达式def apply(): Behavior[Greet] = Behaviors.receive { (context, message) =>context.log.info("Hello {}!", message.whom)message.replyTo ! Greeted(message.whom, context.self) // 向message.replyTo发送消息Greeted,其中context.self是自身的actorrefBehaviors.same // 设置后续的消息处理逻辑不变}
}//#hello-world-bot
object HelloWorldBot {def apply(max: Int): Behavior[HelloWorld.Greeted] = {bot(0, max)}private def bot(greetingCounter: Int, max: Int): Behavior[HelloWorld.Greeted] =Behaviors.receive { (context, message) =>val n = greetingCounter + 1context.log.info("Greeting {} for {}", n, message.whom)if (n == max) {Behaviors.stopped // 到达max次数后停止行为,避免无限循环} else {message.from ! HelloWorld.Greet(message.whom, context.self)bot(n, max)}}
}//#hello-world-main
object HelloWorldMain {final case class SayHello(name: String)def apply(): Behavior[SayHello] =Behaviors.setup { context =>val greeter = context.spawn(HelloWorld(), "greeter") // 初始化HelloWorldBehaviors.receiveMessage { message => // 收到消息后创建HelloWorldBotval replyTo = context.spawn(HelloWorldBot(max = 3), message.name)greeter ! HelloWorld.Greet(message.name, replyTo)Behaviors.same}}def main(args: Array[String]): Unit = {val system: ActorSystem[HelloWorldMain.SayHello] =ActorSystem(HelloWorldMain(), "hello")system ! HelloWorldMain.SayHello("World")Thread.sleep(3000)system.terminate()}
}

chatroom示例

代码的整体示意图如下

chat-room.png

  1. Main启动后,初始化chatRoom和Gabbler客户端。向ChatRoom发送GetSession消息(带有client actorref)
  2. chatRoom创建session actor,用来隔离会话
  3. chatRoom向client发送SessionGranted消息(带有session actorref)
  4. client(Gabbler)收到SessionGranted后向session actor发送PostMessage消息
  5. session 收到SessionGranted后向room发送PublishSessionMessage
  6. room返回NotifyClient给session
  7. 然后按照NotifyClient(带有MessagePosted)中的client actorref将MessagePosted转发给特定的client
  8. client收到MessagePosted之后完成并推出

整体的思路

  1. ChatRoom Actor:作为中央枢纽,负责管理所有的会话(Sessions)。每个连接到聊天室的客户端都会通过 GetSession 消息与 ChatRoom 交互,并获得一个专属的会话 Actor。
  2. Session Actor:每个客户端都有一个对应的 Session Actor,用于处理该客户端的消息收发、保持客户端状态等。
  3. client(Gabbler)Actor:模拟客户端行为,可以发送消息给 ChatRoom 或者其他客户端。
  4. 客户端间通信:通过 ChatRoom 转发消息来实现客户端间的通信。当一个客户端发送消息时,它实际上是将消息发送给了 ChatRoom,然后由 ChatRoom 将消息广播给所有其他在线的客户端。

代码示例

定义消息,这里实际上等同于定义actor之间的通信协议

  • RoomCommand,用来获取session
  • SessionEvent,用来管理session和发送message
  • SessionCommand,用来发送message和通知client
object ChatRoom {sealed trait RoomCommandfinal case class GetSession(screenName: String, replyTo: ActorRef[SessionEvent]) extends RoomCommandsealed trait SessionEventfinal case class SessionGranted(handle: ActorRef[PostMessage]) extends SessionEventfinal case class SessionDenied(reason: String) extends SessionEventfinal case class MessagePosted(screenName: String, message: String) extends SessionEventsealed trait SessionCommandfinal case class PostMessage(message: String) extends SessionCommandprivate final case class NotifyClient(message: MessagePosted) extends SessionCommand
}

ChatRoom actor部分

object ChatRoom {// PublishSessionMessage消息将包含的ChatRoom消息传播到所有连接的客户端private final case class PublishSessionMessage(screenName: String, message: String) extends RoomCommanddef apply(): Behavior[RoomCommand] =chatRoom(List.empty)private def chatRoom(sessions: List[ActorRef[SessionCommand]]): Behavior[RoomCommand] =Behaviors.receive { (context, message) =>message match {// 如果收到GetSession,create a child actor for further interaction with the clientcase GetSession(screenName, client) =>val ses = context.spawn(session(context.self, screenName, client),name = URLEncoder.encode(screenName, StandardCharsets.UTF_8.name))client ! SessionGranted(ses)chatRoom(ses :: sessions) // ::用于将ses添加到sessions头。由于Akka 的行为是不可变的(每次更改状态都必须返回一个新的 behavior),所以通常通过 递归函数 + 参数携带状态 的方式来模拟“状态变化”// 如果接收到 PublishSessionMessage 就向所有的session发送notification,每个session都带有client内容。等于是申请chatroom允许发送case PublishSessionMessage(screenName, message) =>val notification = NotifyClient(MessagePosted(screenName, message))sessions.foreach(_ ! notification) // 将消息转发给session中的所有clientBehaviors.same}}// 用于创建session actor,接受SessionCommand消息private def session(room: ActorRef[PublishSessionMessage],screenName: String,client: ActorRef[SessionEvent]): Behavior[SessionCommand] =Behaviors.receiveMessage {// 向room中的所有其他用户发送消息case PostMessage(message) =>room ! PublishSessionMessage(screenName, message)Behaviors.same// room发布消息通知clientcase NotifyClient(message) =>client ! messageBehaviors.same}
}

客户端部分

object Gabbler {import ChatRoom._def apply(): Behavior[SessionEvent] =Behaviors.setup { context =>Behaviors.receiveMessage {case SessionDenied(reason) =>context.log.info("cannot start chat room session: {}", reason)Behaviors.stoppedcase SessionGranted(handle) =>handle ! PostMessage("Hello World!")Behaviors.samecase MessagePosted(screenName, message) =>context.log.info("message has been posted by '{}': {}", screenName, message)Behaviors.stopped}}

actorsystem入口

  • 这里使用了Behaviors.setup。Behaviors.setupBehaviors.receiveMessage 都是用于定义 Actor 行为的工厂方法,区别是Behaviors.setup 允许你在初始化阶段访问 ActorContext,而 Behaviors.receiveMessage 不直接提供对上下文的访问,专注于消息处理逻辑
  • Main Actor,对应于传统 Java 应用程序中的 main 方法
object Main {def apply(): Behavior[NotUsed] =Behaviors.setup { context =>val chatRoom = context.spawn(ChatRoom(), "chatroom")val gabblerRef = context.spawn(Gabbler(), "gabbler")context.watch(gabblerRef) //监控gabbler actor,如果gabbler 终止了,当前 Actor 将收到一个 Terminated(gabblerRef) 信号chatRoom ! ChatRoom.GetSession("ol’ Gabbler", gabblerRef)// 处理 Terminated 信号Behaviors.receiveSignal {case (_, Terminated(_)) =>Behaviors.stopped}}def main(args: Array[String]): Unit = {ActorSystem(Main(), "ChatRoomDemo")}
}

运行结果如下,按照预期逻辑,目前只有一个client,并且发送消息收到响应后推出

在这里插入图片描述

http://www.xdnf.cn/news/882325.html

相关文章:

  • 【C++项目】负载均衡在线OJ系统-2
  • 解构与重构:PLM 系统如何从管理工具进化为创新操作系统?
  • 通过Chain Prompts方式将LLM的能力引入测试平台:正交实验测试用例生成
  • 多模态大语言模型arxiv论文略读(109)
  • 计算机基础知识(第四篇)
  • Apache Doris + MCP:Agent 时代的实时数据分析底座
  • Ntfs!ReadIndexBuffer函数分析之nt!CcGetVirtualAddress函数之nt!CcGetVacbMiss
  • 如何在电脑上轻松访问 iPhone 文件
  • 斐波那契数列------矩阵幂法
  • 【Python3教程】Python3基础篇之错误和异常
  • Python语法进阶篇 --- 封装、继承、多态、静态方法、类方法
  • 嵌入式学习Day33
  • 如何更快的提升项目的开发进度
  • 从 ClickHouse、Druid、Kylin 到 Doris:网易云音乐 PB 级实时分析平台降本增效
  • 【SSM】SpringBoot笔记2:整合Junit、MyBatis
  • XHR / Fetch / Axios 请求的取消请求与请求重试
  • JVM——如何打造一个类加载器?
  • NLP驱动网页数据分类与抽取实战
  • 「深度拆解」Spring Boot如何用DeepSeek重构MCP通信层?从线程模型到分布式推理的架构进化
  • 自动驾驶+人形机器人?亚马逊即将测试人形机器人送货
  • 元素 “cas:serviceResponse“ 的前缀 “cas“ 未绑定
  • 使用ReactNative加载Svga动画支持三端【Android/IOS/Harmony】
  • StarRocks
  • Spring Boot + OpenAI 构建基于RAG的智能问答系统
  • Java 抗量子算法:构建后量子时代的安全基石
  • 系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型
  • 接IT方案编写(PPT/WORD)、业务架构设计、投标任务
  • 为什么要选择VR看房?VR看房有什么优点?
  • 大陆4D毫米波雷达ARS548调试
  • [蓝桥杯]后缀表达式