Pekko 初始化

官方网站: Pekko

在网络开发中, Actor(参与者模型) 是一种并发编程模型, 核心思想是将程序拆分为一个个独立和自治的 Actor 实体,
每个 Actor 拥有自己的状态和行为, 仅通过异步消息传递进行通信, 互不直接共享内存, 带有以下特性:

  • 独立性: 每个 Actor 都是独立的执行单元且有自己的私有状态, 外部无法直接修改其状态, 只能通过发送消息触发 Actor
    内部逻辑来改变状态

  • 异步消息传递: Actor 之间的通信是异步和非阻塞的(
    发送方把消息丢给接收方的消息队列后就继续执行,接收方按照消息到达顺序依次处理)

  • 单线程处理: 一个 Actor 在同一时间只会处理一条消息, 天然避免了多线程共享状态的竞态问题, 不需要加锁(如synchronized)

  • 地址与标识: 每个 Actor 有唯一的地址, 发送消息时只需指定地址, 无需关心 Actor 的物理位置(本地或远程),
    这为分布式系统提供了天然支持

传统多线程模式需要手动处理锁、线程池、资源竞争, 否则容易出现死锁、数据不一致等问题

Actor 主要解决的问题:

  1. 共享状态的并发冲突问题
    传统多线程并发的最大痛点是共享内存竞争, 必须依赖锁机制, 而锁会导致性能损耗、死锁风险.
    Actor的私有状态+单线程处理模式, 从根本上消除了共享状态的竞争, 无需加锁.

  2. 高并发下的复杂性问题
    高并发网络系统需要处理大量的连接、请求和响应, 手动管理线程池、任务队列会让代码变得复杂且难以维护.
    Actor将并发逻辑封装到独立的Actor实例中, 代码逻辑更清晰, 易于调试和扩展.

  3. 分布式系统的通信与容错问题
    分布式网络系统中, 节点间的通信、故障处理是难点.
    Actor的地址透明性简化了跨节点通信, 监督机制则提供了优雅的故障恢复方案, 降低了分布式系统的开发门槛.

  4. 线程资源受限的问题
    线程的创建和切换成本较高, 系统能承载的线程数有限.
    轻量级的Actor可以用极少的资源支持海量并发实体, 更适合高吞吐的网络场景.

之前最开始工作接触到的 skynet 网络库其实也是源于 actor, 可以让多线程网络程序和单线程逻辑一样.

Java 之中有两个通用的 Actor 方案: Akka(商业付费) / Pekko(开源使用)

很多网络库都会采用 Pekko 作为地层的通用 RPC(remote procedure call, 远程过程调用) 替代.

推荐学习下 RPC 通信协议, 对于大部分高性能网络数据交换本质上都是实现这种协议

这里推荐采用 maven + java17 来应用处理, 构建出最基础的网络层服务.

基础引入

这里的 maven 采用单项目做维护, 只需要单独修改 所属组织包名 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- 基础信息, 组织和包名这些可以放置在此 -->
<groupId>io.meteorcat.game</groupId>
<artifactId>pico</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- 全局属性 -->
<properties>
<!-- 基础属性 -->
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<!-- pekko 依赖 -->
<!-- 查看官方最新版本: https://pekko.apache.org/docs/pekko/current/typed/guide/tutorial_1.html -->
<pekko.platform.artifact-id>pekko-bom</pekko.platform.artifact-id>
<pekko.platform.group-id>org.apache.pekko</pekko.platform.group-id>
<pekko.platform.version>1.3.0</pekko.platform.version>
<pekko.platform.scala-version>2.13</pekko.platform.scala-version>


</properties>


<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${pekko.platform.scala-version}</artifactId>
<version>${pekko.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<!-- 第三方包管理 -->
<dependencies>
<!-- 因为有全局版本管理, 所以涉及 pekko 不需要编写版本信息 -->

<!-- 这里引入最基础的 actor -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_${pekko.platform.scala-version}</artifactId>
</dependency>

<!-- 但是后续 pekko 有区分弱类型和强类型版本, 官方更加推荐采用强类型版本, 但是弱类型版本教程更多 -->
<!-- 默认弱类型版本采用底层 Scala 的 Any( 相当于 Java 的 Object) 包装 -->
<!-- 对于弱类型来说, 消息类型的校验是运行时的, 编译期不会检查发送的消息是否被 Actor 支持 -->
<!-- 所以编译的时候根本无法检查出对应传递的消息是否被支持, 只能运行测试之后才能确定 -->
<!-- 弱类型包放置于 org.apache.pekko.actor 内部-->

<!-- 而强类型则是要求在定义 Actor 时就明确能接收的消息类型, 编译器会强制校验发送的消息是否符合类型要求, 不允许发送非声明的消息 -->
<!-- 强类型包放置于 org.apache.pekko.actor.typed 内部 -->


<!-- 以下就是强类型版本引入 -->
<!-- <dependency> -->
<!-- <groupId>org.apache.pekko</groupId> -->
<!-- <artifactId>pekko-actor-typed_${pekko.platform.scala-version}</artifactId> -->
<!-- </dependency> -->

</dependencies>
</project>

对于 Pekko 强弱类型的对比:

对比维度 弱类型版本(经典 Actor) 强类型版本(Typed Actor)
类型校验时机 运行时校验,编译期不检查消息类型 编译期校验,非法消息直接编译失败
消息类型范围 可接收任意 Any/Object 类型的消息 仅接收泛型参数指定的消息类型
API 风格 基于继承 Actor 特质,状态与行为耦合 基于函数式 Behavior,状态与行为解耦
错误排查难度 类型错误在运行时暴露,需手动测试覆盖 类型错误在编译期暴露,提前规避运行时异常
扩展性与维护性 大型项目易出现"消息满天飞",难以追溯 消息类型封闭可控,适合团队协作和大型项目
监督机制 监督逻辑与 Actor 耦合,写法较繁琐 内置更简洁的监督 API,与 Behavior 集成
教程与资料丰富度 资料多,社区案例成熟(继承 Akka 遗产) 资料较少,但官方持续更新,是未来趋势
所属包 org.apache.pekko.actor org.apache.pekko.actor.typed

这里按照官方文档构建个初级功能类如下, 注意官方是采用强类型来编写处理(如果要实际使用最好也是采用强类型):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package io.meteorcat.game;

import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;

/**
* 实现数据打印功能的 Actor
* AbstractBehavior: 就是声明该类是作为 Actor 继承
* String: 则是代表内部做消息传递是采用 String(后续可以自定义结构体来交换数据)
*/
class PrintMyActorRefActor extends AbstractBehavior<String> {

/**
* 静态创建 Actor 并且利用 Behaviors.setup 初始化在 Actor 系统之中等待挂载
* 注意: 调用该静态生成的实例化对象是 '等待挂载中' 的状态
*/
static Behavior<String> create() {
return Behaviors.setup(PrintMyActorRefActor::new);
}

/**
* 如果是声明为 Actor 类, 最好不允许外部做实例化, 而是要通过 Behaviors.setup 生成实例
*/
private PrintMyActorRefActor(ActorContext<String> context) {
super(context);
}

/**
* Actor 必须实现的消息拦截方法
* 拦截的消息就是 AbstractBehavior<T> 之中的 T 类型, 对于强类型来说
*/
@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessageEquals("printit", this::printIt).build();
}

/**
* 自定义内部 Actor 功能
* 注意调用之后都需要返回 Behavior<T> 来让 Actor 保持当前 Actor 状态
*/
private Behavior<String> printIt() {
// getContext() 是 Actor 内置方法, 用于获取 Actor 上下文
// getContext().spawn() 调用 Actor 上下文动态在当前 Actor 节点额外创建 Actor 并且自动挂载
// 其中 Behaviors.empty() 代表生成默认的为空(没有自定义) 的 Actor, 如果想指定自定义则是 Behaviors.setup(自定义类实例)
// "second-actor" 则是指定挂载于当前 Actor 节点下的名称
ActorRef<String> secondRef = getContext().spawn(Behaviors.empty(), "second-actor");
System.out.println("Second: " + secondRef);
return this;
}
}

/**
* 根节点下的第一节点 Actor
*/
class Main extends AbstractBehavior<String> {

/**
* 静态创建 Actor 并且利用 Behaviors.setup 初始化在 Actor 系统之中等待挂载
* 注意: 调用该静态生成的实例化对象是 '等待挂载中' 的状态
*/
static Behavior<String> create() {
return Behaviors.setup(Main::new);
}

private Main(ActorContext<String> context) {
super(context);
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessageEquals("start", this::start).build();
}

/**
* 自定义拦截方法
* 这里则是动态创建自定义 Actor并且转发消息
* Behaviors.same() 代表保持消息, 其实就是和返回 this 一致
*/
private Behavior<String> start() {
ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor");

System.out.println("First: " + firstRef);
firstRef.tell("printit");
return Behaviors.same();
}
}


/**
* 全局启动入口类
*/
public class ActorHierarchyExperiments {
/**
* 全局启动入口方法
*/
public static void main(String[] args) {
// ActorSystem.create 代表创建 Actor 系统并且声明 Main.create() 构建的实例
// ActorRef 就是 ActorSystem 创建 Actor 之后的引用地址, Actor 的消息都是引用地址做消息传递
// 其中 testSystem 就是 Root Actor 的之下名称
ActorRef<String> testSystem = ActorSystem.create(Main.create(), "testSystem");
testSystem.tell("start");// 推送指定 String 类型消息给 actor 系统
}
}

需要注意消息请求第一个 Actor 执行运行的方式, 通过使用父级的引用来发送消息:
firstRef.tell("printit", ActorRef.noSender());
当代码执行时, 输出将包含第一个 Actor 及其在 printit 情况中创建的子级的引用, 输出应类似于以下内容:

1
2
First: Actor[pekko://testSystem/user/first-actor#1053618476]
Second: Actor[pekko://testSystem/user/first-actor/second-actor#-1544706041]

首先当启动 ActorSystem 的时候其实已经构建 pekko://testSystem 的虚拟网络路径( testSystem 是创建时自定义名称),
而因为传递 Main.create() 导致将 Main 对象挂载于 / 路径, 所以 pekko://testSystem/ -> Main.class(/ 为根路径).

之后就是 Main 内部的 ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor"),
getContext().spawn() 创建 Actor 并挂载于当前节点运行, 而内部动态创建节点默认挂载于 pekko://{父节点地址}/user 内部,
所以最终又构建出了 pekko://testSystem/user/first-actor 节点, second-actor 节点也是以此类推:

actor tree

如果要实现动态挂载 Actor 就必须要自定义去生成根节点的 Actor 处理,
另外如果想实例化处理的时候传递参数可以利用以下方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 带参数的实例化构建 Actor
*/
class PrintParamsActor extends AbstractBehavior<String> {

final String message;


private PrintParamsActor(ActorContext<String> context, String message) {
super(context);// 传递给上层的上下文
this.message = message;
}

/**
* 附带参数一并实例化构建成 Actor
*/
public static Behavior<String> create(String message) {
// 内部其实是 (ActorContext)->AbstractBehavior<T> 的回调方法
// 只需要按照回调方式传入即可, 其实就是动态实例化对象返回
return Behaviors.setup((ctx) -> new PrintParamsActor(ctx, message));
}

// 其他略
}

处理动态构建(spawn)之外, 其实还有其他涉及 Actor 声明周期的处理, 比如 Actor 的关闭退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class StartStopActor1 extends AbstractBehavior<String> {

static Behavior<String> create() {
return Behaviors.setup(StartStopActor1::new);
}

private StartStopActor1(ActorContext<String> context) {
super(context);
System.out.println("first started");

// 初始化构建的同时动态构建其他 Actor 并挂载于 pekko://{父节点}/second
context.spawn(StartStopActor2.create(), "second");
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder()
// 监听外部的消息是否有传递 'stop' 字符, 有的话触发 Behaviors::stopped 状态变动
// Behaviors::stopped() 是系统定义的停止并退出当前 Actor 特殊状态变更消息
.onMessageEquals("stop", Behaviors::stopped)
// onSignal 是系统特殊信号拦截
// PostStop.class 是系统内置的退出消息
.onSignal(PostStop.class, signal -> onPostStop())
.build();
}

private Behavior<String> onPostStop() {
System.out.println("first stopped");
return this;
}
}

class StartStopActor2 extends AbstractBehavior<String> {

static Behavior<String> create() {
return Behaviors.setup(StartStopActor2::new);
}

private StartStopActor2(ActorContext<String> context) {
super(context);
System.out.println("second started");
}

@Override
public Receive<String> createReceive() {
// 监听推出的系统信号
// 加入父级被关闭的时候, 会优先关闭子节点的 Actor, 最后关闭掉父 Actor 回调方法
return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build();
}

private Behavior<String> onPostStop() {
System.out.println("second stopped");
return this;
}
}

假如外部初始化 Actor 如下被动态构建:

1
2
3
// 构建 StartStopActor1 创建并附加在 Actor 节点树
ActorRef<String> first = context.spawn(StartStopActor1.create(), "first");
first.tell("stop"); // 发送关闭消息

这里最后调用打印输出的内容如下:

1
2
3
4
first started
second started
second stopped
first stopped

除了 Actor 的生命周期关闭信号之外, 还有涉及 异常重启(Failure Restart) 的功能,
这里主要是让父子级的 Actor 能够相互监听异常抛出从而方便直接重启 Actor 进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class SupervisingActor extends AbstractBehavior<String> {

static Behavior<String> create() {
return Behaviors.setup(SupervisingActor::new);
}

private final ActorRef<String> child;

private SupervisingActor(ActorContext<String> context) {
super(context);
// 动态初始化子节点
child = context.spawn(
// Behaviors.supervise 设定高级配置的 Actor 实例化
Behaviors.supervise(SupervisedActor.create())
// 注意这里追加 Actor 异常回调处理, 假如内部 Actor 抛出运行时相关异常( RuntimeException)
// 回调内部的 SupervisorStrategy.restart() 就是内置的重启方法, 会对子 Actor 立即执行服务重启
.onFailure(SupervisorStrategy.restart())
, "supervised-actor");
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder().onMessageEquals("failChild", this::onFailChild).build();
}

private Behavior<String> onFailChild() {
child.tell("fail");
return this;
}
}

/**
* 被父Actor管理的子 Actor
*/
class SupervisedActor extends AbstractBehavior<String> {

static Behavior<String> create() {
return Behaviors.setup(SupervisedActor::new);
}

private SupervisedActor(ActorContext<String> context) {
super(context);
System.out.println("supervised actor started");
}

@Override
public Receive<String> createReceive() {
return newReceiveBuilder()
// 接受到主动抛出异常的消息
.onMessageEquals("fail", this::fail)
// 系统内部预定义的信号:
// - 重启(PreRestart)
// - 关闭(PostStop)
.onSignal(PreRestart.class, signal -> preRestart())
// 注意: 重启的时候会默认执行关闭之后调用 PreRestart 再初始化
.onSignal(PostStop.class, signal -> postStop())
.build();
}

private Behavior<String> fail() {
// !!! 注意: 这里代表子节点内部业务异常而抛出异常信号
System.out.println("supervised actor fails now");
throw new RuntimeException("I failed!");
}

private Behavior<String> preRestart() {
System.out.println("supervised will be restarted");
return this;
}

private Behavior<String> postStop() {
System.out.println("supervised stopped");
return this;
}
}

如果执行重启的 Actor 的构建方法:

1
2
ActorRef<String> supervisingActor = context.spawn(SupervisingActor.create(), "supervising-actor");
supervisingActor.tell("failChild");

这里最后的输出内容如下, 这就是作为 supervised 的执行生命周期, 可以看到以此被调用的启动路径:

1
2
3
4
5
6
7
8
9
supervised actor started
supervised actor fails now
supervised actor will be restarted
supervised actor started
[ERROR] [11/12/2018 12:03:27.171] [ActorHierarchyExperiments-pekko.actor.default-dispatcher-2] [pekko://ActorHierarchyExperiments/user/supervising-actor/supervised-actor] Supervisor pekko.actor.typed.internal.RestartSupervisor@1c452254 saw failure: I failed!
java.lang.Exception: I failed!
at typed.tutorial_1.SupervisedActor.onMessage(ActorHierarchyExperiments.scala:113)
at typed.tutorial_1.SupervisedActor.onMessage(ActorHierarchyExperiments.scala:106)
at org.apache.pekko.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:59)

具体例子请参照: tutoria

扩展模块

我们之前展示的是基础的 pekko-actor{-typed} 模块, 实际上还有以下相关扩展模块

模块分类 模块名称 模块包名(有的包是共用不区分强弱类型) 弱类型依赖体系(pekko-actor) 强类型依赖体系(pekko-actor-typed) 模块核心定位 补充说明
核心 Actor 基础 Actor 核心 弱类型:pekko-actor_{scala版本号}
强类型:pekko-actor-typed_{scala版本号}
pekko-actor(核心依赖) pekko-actor-typed(核心依赖) 提供 Actor 模型基础能力,弱类型基于继承,强类型基于 Behavior 专属模块,互不依赖
分布式通信 Pekko Remote 弱类型:pekko-remote_{scala版本号}
强类型:pekko-remote-typed_{scala版本号}
依赖 pekko-actor 依赖 pekko-actor-typed 实现跨 JVM 远程 Actor 通信,地址透明化 专属模块,强类型是独立封装
Pekko Cluster 弱类型:pekko-cluster_{scala版本号}
强类型:pekko-cluster-typed_{scala版本号}
依赖 pekko-remote + pekko-actor 依赖 pekko-remote-typed + pekko-actor-typed 分布式集群管理,节点发现/故障检测 专属模块,强类型重构了集群 API
Pekko Cluster Sharding 弱类型:pekko-cluster-sharding_{scala版本号}
强类型:pekko-cluster-sharding-typed_{scala版本号}
依赖 pekko-cluster + pekko-actor 依赖 pekko-cluster-typed + pekko-actor-typed 海量 Actor 分片部署与负载均衡 专属模块,强类型优化了分片路由
状态持久化 Pekko Persistence 弱类型:pekko-persistence_{scala版本号}
强类型:pekko-persistence-typed_{scala版本号}
依赖 pekko-actor 依赖 pekko-actor-typed 基于事件溯源的 Actor 状态持久化 专属模块,强类型封装了持久化 Behavior
Pekko Persistence Query 共用:pekko-persistence-query_{scala版本号} 依赖 pekko-persistence + pekko-actor 依赖 pekko-persistence-typed + pekko-actor-typed 持久化事件日志的查询能力 共用模块,官方无 -typed 版本,通过类型适配支持两套体系
流式处理 Pekko Stream 弱类型:pekko-stream_{scala版本号}
强类型:pekko-stream-typed_{scala版本号}
依赖 pekko-actor 依赖 pekko-actor-typed 反应式流处理,背压控制,高吞吐数据处理 专属模块,强类型提供类型安全的流 API
网络服务 Pekko HTTP 共用:pekko-http-core_{scala版本号}
共用:pekko-http_{scala版本号}
依赖 pekko-stream + pekko-actor 依赖 pekko-stream-typed + pekko-actor-typed 异步非阻塞 HTTP 服务器/客户端、WebSocket 共用模块,无 -typed 版本,与 Stream 强/弱版本搭配使用
集成连接器 Pekko Connectors Kafka 共用:pekko-connectors-kafka_{scala版本号} 依赖 pekko-stream + pekko-actor 依赖 pekko-stream-typed + pekko-actor-typed 与 Kafka 集成的流处理连接器 共用模块,基于 Stream 构建,无需区分强弱类型
Pekko Connectors AWS 共用:pekko-connectors-aws-s3_{scala版本号}
共用:pekko-connectors-aws-sqs_{scala版本号}
依赖 pekko-stream + pekko-actor 依赖 pekko-stream-typed + pekko-actor-typed 与 AWS 服务(S3/SQS 等)集成 共用模块,按需引入子模块
运维监控 Pekko Management 共用:pekko-management_{scala版本号}
共用:pekko-management-cluster-http_{scala版本号}
依赖 pekko-cluster + pekko-actor 依赖 pekko-cluster-typed + pekko-actor-typed 集群监控 HTTP 接口、健康检查 共用模块,监控接口与 Actor 类型无关
测试工具 Pekko TestKit(弱类型) 专属:pekko-testkit_{scala版本号} 依赖 pekko-actor - 弱类型 Actor 的单元测试工具 专属模块,仅支持弱类型
Pekko TestKit Typed(强类型) 专属:pekko-actor-testkit-typed_{scala版本号} - 依赖 pekko-actor-typed 强类型 Actor 的单元测试工具 专属模块,仅支持强类型

这些模块按照需要来引入项目当中使用, 比较常用的就是设计网关的时候可能需要用到 pekko-httppekko-stream 等.