官方网站: Pekko
在网络开发中, Actor(参与者模型) 是一种并发编程模型, 核心思想是将程序拆分为一个个独立和自治的 Actor 实体,
每个 Actor 拥有自己的状态和行为, 仅通过异步消息传递进行通信, 互不直接共享内存, 带有以下特性:
独立性: 每个 Actor 都是独立的执行单元且有自己的私有状态, 外部无法直接修改其状态, 只能通过发送消息触发 Actor
内部逻辑来改变状态
异步消息传递: Actor 之间的通信是异步和非阻塞的(
发送方把消息丢给接收方的消息队列后就继续执行,接收方按照消息到达顺序依次处理)
单线程处理: 一个 Actor 在同一时间只会处理一条消息, 天然避免了多线程共享状态的竞态问题, 不需要加锁(如synchronized)
地址与标识: 每个 Actor 有唯一的地址, 发送消息时只需指定地址, 无需关心 Actor 的物理位置(本地或远程),
这为分布式系统提供了天然支持
传统多线程模式需要手动处理锁、线程池、资源竞争, 否则容易出现死锁、数据不一致等问题
Actor 主要解决的问题:
共享状态的并发冲突问题
传统多线程并发的最大痛点是共享内存竞争 , 必须依赖锁机制, 而锁会导致性能损耗、死锁风险.
Actor的私有状态+单线程处理模式, 从根本上消除了共享状态的竞争, 无需加锁.
高并发下的复杂性问题
高并发网络系统需要处理大量的连接、请求和响应, 手动管理线程池、任务队列会让代码变得复杂且难以维护.
Actor将并发逻辑封装到独立的Actor实例中, 代码逻辑更清晰, 易于调试和扩展.
分布式系统的通信与容错问题
分布式网络系统中, 节点间的通信、故障处理是难点.
Actor的地址透明性简化了跨节点通信, 监督机制则提供了优雅的故障恢复方案, 降低了分布式系统的开发门槛.
线程资源受限的问题
线程的创建和切换成本较高, 系统能承载的线程数有限.
轻量级的Actor可以用极少的资源支持海量并发实体, 更适合高吞吐的网络场景.
之前最开始工作接触到的 skynet 网络库其实也是源于 actor, 可以让多线程网络程序和单线程逻辑一样.
而 Java 之中有两个通用的 Actor 方案: Akka(商业付费) / Pekko(开源使用)
很多网络库都会采用 Pekko 作为地层的通用 RPC(remote procedure call, 远程过程调用) 替代.
推荐学习下 RPC 通信协议, 对于大部分高性能网络数据交换本质上都是实现这种协议
这里推荐采用 maven + java17 来应用处理, 构建出最基础的网络层服务.
基础引入
这里的 maven 采用单项目做维护, 只需要单独修改 所属组织 和 包名 即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > io.meteorcat.game</groupId > <artifactId > pico</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <maven.compiler.source > 17</maven.compiler.source > <maven.compiler.target > 17</maven.compiler.target > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <pekko.platform.artifact-id > pekko-bom</pekko.platform.artifact-id > <pekko.platform.group-id > org.apache.pekko</pekko.platform.group-id > <pekko.platform.version > 1.3.0</pekko.platform.version > <pekko.platform.scala-version > 2.13</pekko.platform.scala-version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-bom_${pekko.platform.scala-version}</artifactId > <version > ${pekko.platform.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > org.apache.pekko</groupId > <artifactId > pekko-actor_${pekko.platform.scala-version}</artifactId > </dependency > </dependencies > </project >
对于 Pekko 强弱类型的对比:
对比维度
弱类型版本(经典 Actor)
强类型版本(Typed Actor)
类型校验时机
运行时校验,编译期不检查消息类型
编译期校验,非法消息直接编译失败
消息类型范围
可接收任意 Any/Object 类型的消息
仅接收泛型参数指定的消息类型
API 风格
基于继承 Actor 特质,状态与行为耦合
基于函数式 Behavior,状态与行为解耦
错误排查难度
类型错误在运行时暴露,需手动测试覆盖
类型错误在编译期暴露,提前规避运行时异常
扩展性与维护性
大型项目易出现"消息满天飞",难以追溯
消息类型封闭可控,适合团队协作和大型项目
监督机制
监督逻辑与 Actor 耦合,写法较繁琐
内置更简洁的监督 API,与 Behavior 集成
教程与资料丰富度
资料多,社区案例成熟(继承 Akka 遗产)
资料较少,但官方持续更新,是未来趋势
所属包
org.apache.pekko.actor
org.apache.pekko.actor.typed
这里按照官方文档构建个初级功能类如下, 注意官方是采用强类型来编写处理(如果要实际使用最好也是采用强类型):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 package io.meteorcat.game;import org.apache.pekko.actor.typed.ActorRef;import org.apache.pekko.actor.typed.ActorSystem;import org.apache.pekko.actor.typed.Behavior;import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;import org.apache.pekko.actor.typed.javadsl.ActorContext;import org.apache.pekko.actor.typed.javadsl.Behaviors;import org.apache.pekko.actor.typed.javadsl.Receive;class PrintMyActorRefActor extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(PrintMyActorRefActor::new ); } private PrintMyActorRefActor (ActorContext<String> context) { super (context); } @Override public Receive<String> createReceive () { return newReceiveBuilder().onMessageEquals("printit" , this ::printIt).build(); } private Behavior<String> printIt () { ActorRef<String> secondRef = getContext().spawn(Behaviors.empty(), "second-actor" ); System.out.println("Second: " + secondRef); return this ; } } class Main extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(Main::new ); } private Main (ActorContext<String> context) { super (context); } @Override public Receive<String> createReceive () { return newReceiveBuilder().onMessageEquals("start" , this ::start).build(); } private Behavior<String> start () { ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor" ); System.out.println("First: " + firstRef); firstRef.tell("printit" ); return Behaviors.same(); } } public class ActorHierarchyExperiments { public static void main (String[] args) { ActorRef<String> testSystem = ActorSystem.create(Main.create(), "testSystem" ); testSystem.tell("start" ); } }
需要注意消息请求第一个 Actor 执行运行的方式, 通过使用父级的引用来发送消息:
firstRef.tell("printit", ActorRef.noSender());
当代码执行时, 输出将包含第一个 Actor 及其在 printit 情况中创建的子级的引用, 输出应类似于以下内容:
1 2 First: Actor[pekko://testSystem/user/first-actor#1053618476] Second: Actor[pekko://testSystem/user/first-actor/second-actor#-1544706041]
首先当启动 ActorSystem 的时候其实已经构建 pekko://testSystem 的虚拟网络路径( testSystem 是创建时自定义名称),
而因为传递 Main.create() 导致将 Main 对象挂载于 / 路径, 所以 pekko://testSystem/ -> Main.class(/ 为根路径).
之后就是 Main 内部的 ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor"),
getContext().spawn() 创建 Actor 并挂载于当前节点运行, 而内部动态创建节点默认挂载于 pekko://{父节点地址}/user 内部,
所以最终又构建出了 pekko://testSystem/user/first-actor 节点, second-actor 节点也是以此类推:
如果要实现动态挂载 Actor 就必须要自定义去生成根节点的 Actor 处理,
另外如果想实例化处理的时候传递参数可以利用以下方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class PrintParamsActor extends AbstractBehavior <String> { final String message; private PrintParamsActor (ActorContext<String> context, String message) { super (context); this .message = message; } public static Behavior<String> create (String message) { return Behaviors.setup((ctx) -> new PrintParamsActor (ctx, message)); } }
处理动态构建(spawn)之外, 其实还有其他涉及 Actor 声明周期的处理, 比如 Actor 的关闭退出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class StartStopActor1 extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(StartStopActor1::new ); } private StartStopActor1 (ActorContext<String> context) { super (context); System.out.println("first started" ); context.spawn(StartStopActor2.create(), "second" ); } @Override public Receive<String> createReceive () { return newReceiveBuilder() .onMessageEquals("stop" , Behaviors::stopped) .onSignal(PostStop.class, signal -> onPostStop()) .build(); } private Behavior<String> onPostStop () { System.out.println("first stopped" ); return this ; } } class StartStopActor2 extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(StartStopActor2::new ); } private StartStopActor2 (ActorContext<String> context) { super (context); System.out.println("second started" ); } @Override public Receive<String> createReceive () { return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build(); } private Behavior<String> onPostStop () { System.out.println("second stopped" ); return this ; } }
假如外部初始化 Actor 如下被动态构建:
1 2 3 // 构建 StartStopActor1 创建并附加在 Actor 节点树 ActorRef<String> first = context.spawn(StartStopActor1.create(), "first"); first.tell("stop"); // 发送关闭消息
这里最后调用打印输出的内容如下:
1 2 3 4 first started second started second stopped first stopped
除了 Actor 的生命周期关闭信号之外, 还有涉及 异常重启(Failure Restart) 的功能,
这里主要是让父子级的 Actor 能够相互监听异常抛出从而方便直接重启 Actor 进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 class SupervisingActor extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(SupervisingActor::new ); } private final ActorRef<String> child; private SupervisingActor (ActorContext<String> context) { super (context); child = context.spawn( Behaviors.supervise(SupervisedActor.create()) .onFailure(SupervisorStrategy.restart()) , "supervised-actor" ); } @Override public Receive<String> createReceive () { return newReceiveBuilder().onMessageEquals("failChild" , this ::onFailChild).build(); } private Behavior<String> onFailChild () { child.tell("fail" ); return this ; } } class SupervisedActor extends AbstractBehavior <String> { static Behavior<String> create () { return Behaviors.setup(SupervisedActor::new ); } private SupervisedActor (ActorContext<String> context) { super (context); System.out.println("supervised actor started" ); } @Override public Receive<String> createReceive () { return newReceiveBuilder() .onMessageEquals("fail" , this ::fail) .onSignal(PreRestart.class, signal -> preRestart()) .onSignal(PostStop.class, signal -> postStop()) .build(); } private Behavior<String> fail () { System.out.println("supervised actor fails now" ); throw new RuntimeException ("I failed!" ); } private Behavior<String> preRestart () { System.out.println("supervised will be restarted" ); return this ; } private Behavior<String> postStop () { System.out.println("supervised stopped" ); return this ; } }
如果执行重启的 Actor 的构建方法:
1 2 ActorRef<String> supervisingActor = context.spawn(SupervisingActor.create(), "supervising-actor"); supervisingActor.tell("failChild");
这里最后的输出内容如下, 这就是作为 supervised 的执行生命周期, 可以看到以此被调用的启动路径:
1 2 3 4 5 6 7 8 9 supervised actor started supervised actor fails now supervised actor will be restarted supervised actor started [ERROR] [11/12/2018 12:03:27.171] [ActorHierarchyExperiments-pekko.actor.default-dispatcher-2] [pekko://ActorHierarchyExperiments/user/supervising-actor/supervised-actor] Supervisor pekko.actor.typed.internal.RestartSupervisor@1c452254 saw failure: I failed! java.lang.Exception: I failed! at typed.tutorial_1.SupervisedActor.onMessage(ActorHierarchyExperiments.scala:113) at typed.tutorial_1.SupervisedActor.onMessage(ActorHierarchyExperiments.scala:106) at org.apache.pekko.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:59)
具体例子请参照: tutoria
扩展模块
我们之前展示的是基础的 pekko-actor{-typed} 模块, 实际上还有以下相关扩展模块
模块分类
模块名称
模块包名(有的包是共用不区分强弱类型)
弱类型依赖体系(pekko-actor)
强类型依赖体系(pekko-actor-typed)
模块核心定位
补充说明
核心 Actor
基础 Actor 核心
弱类型:pekko-actor_{scala版本号} 强类型:pekko-actor-typed_{scala版本号}
pekko-actor(核心依赖)
pekko-actor-typed(核心依赖)
提供 Actor 模型基础能力,弱类型基于继承,强类型基于 Behavior
专属模块,互不依赖
分布式通信
Pekko Remote
弱类型:pekko-remote_{scala版本号} 强类型:pekko-remote-typed_{scala版本号}
依赖 pekko-actor
依赖 pekko-actor-typed
实现跨 JVM 远程 Actor 通信,地址透明化
专属模块,强类型是独立封装
Pekko Cluster
弱类型:pekko-cluster_{scala版本号} 强类型:pekko-cluster-typed_{scala版本号}
依赖 pekko-remote + pekko-actor
依赖 pekko-remote-typed + pekko-actor-typed
分布式集群管理,节点发现/故障检测
专属模块,强类型重构了集群 API
Pekko Cluster Sharding
弱类型:pekko-cluster-sharding_{scala版本号} 强类型:pekko-cluster-sharding-typed_{scala版本号}
依赖 pekko-cluster + pekko-actor
依赖 pekko-cluster-typed + pekko-actor-typed
海量 Actor 分片部署与负载均衡
专属模块,强类型优化了分片路由
状态持久化
Pekko Persistence
弱类型:pekko-persistence_{scala版本号} 强类型:pekko-persistence-typed_{scala版本号}
依赖 pekko-actor
依赖 pekko-actor-typed
基于事件溯源的 Actor 状态持久化
专属模块,强类型封装了持久化 Behavior
Pekko Persistence Query
共用:pekko-persistence-query_{scala版本号}
依赖 pekko-persistence + pekko-actor
依赖 pekko-persistence-typed + pekko-actor-typed
持久化事件日志的查询能力
共用模块 ,官方无 -typed 版本,通过类型适配支持两套体系
流式处理
Pekko Stream
弱类型:pekko-stream_{scala版本号} 强类型:pekko-stream-typed_{scala版本号}
依赖 pekko-actor
依赖 pekko-actor-typed
反应式流处理,背压控制,高吞吐数据处理
专属模块,强类型提供类型安全的流 API
网络服务
Pekko HTTP
共用:pekko-http-core_{scala版本号} 共用:pekko-http_{scala版本号}
依赖 pekko-stream + pekko-actor
依赖 pekko-stream-typed + pekko-actor-typed
异步非阻塞 HTTP 服务器/客户端、WebSocket
共用模块 ,无 -typed 版本,与 Stream 强/弱版本搭配使用
集成连接器
Pekko Connectors Kafka
共用:pekko-connectors-kafka_{scala版本号}
依赖 pekko-stream + pekko-actor
依赖 pekko-stream-typed + pekko-actor-typed
与 Kafka 集成的流处理连接器
共用模块 ,基于 Stream 构建,无需区分强弱类型
Pekko Connectors AWS
共用:pekko-connectors-aws-s3_{scala版本号} 共用:pekko-connectors-aws-sqs_{scala版本号}
依赖 pekko-stream + pekko-actor
依赖 pekko-stream-typed + pekko-actor-typed
与 AWS 服务(S3/SQS 等)集成
共用模块 ,按需引入子模块
运维监控
Pekko Management
共用:pekko-management_{scala版本号} 共用:pekko-management-cluster-http_{scala版本号}
依赖 pekko-cluster + pekko-actor
依赖 pekko-cluster-typed + pekko-actor-typed
集群监控 HTTP 接口、健康检查
共用模块 ,监控接口与 Actor 类型无关
测试工具
Pekko TestKit(弱类型)
专属:pekko-testkit_{scala版本号}
依赖 pekko-actor
-
弱类型 Actor 的单元测试工具
专属模块,仅支持弱类型
Pekko TestKit Typed(强类型)
专属:pekko-actor-testkit-typed_{scala版本号}
-
依赖 pekko-actor-typed
强类型 Actor 的单元测试工具
专属模块,仅支持强类型
这些模块按照需要来引入项目当中使用, 比较常用的就是设计网关的时候可能需要用到 pekko-http 和 pekko-stream 等.