Quarkus 集成 Pekko

如果要做比较高并发的游戏服务端, 单独采用以下技术栈会有以下问题:

  • Quarkus: 作为集成框架缺少对应 Actor 底层游戏架构, 所以得自己手动实现
  • Pekko: 作为 Actor 缺少容器托管和快捷数据库 ORM 框架

所以需要将两者结合起来, 就能设计开发出稳定高效的游戏游戏服务端框架, 这里提供简单的 POM 项目依赖文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>io.meteorcat.game</groupId>
<artifactId>pico</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- 全局属性 -->
<properties>
<!-- 编译配置 -->
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<maven.compiler.release>21</maven.compiler.release>
<compiler-plugin.version>3.14.0</compiler-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<!-- quarkus 依赖版本配置 -->
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.26.1</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.5.3</surefire-plugin.version>

<!-- pekko 依赖 -->
<!-- 查看官方最新版本: https://pekko.apache.org/docs/pekko/current/typed/guide/tutorial_1.html -->
<pekko.platform.artifact-id>pekko-bom</pekko.platform.artifact-id>
<pekko.platform.group-id>org.apache.pekko</pekko.platform.group-id>
<pekko.platform.version>1.3.0</pekko.platform.version>
<pekko.platform.scala-version>2.13</pekko.platform.scala-version>

<!-- protobuf 依赖 -->
<protobuf.platform.artifact-id>protobuf-bom</protobuf.platform.artifact-id>
<protobuf.platform.group-id>com.google.protobuf</protobuf.platform.group-id>
<protobuf.platform.version>4.32.0</protobuf.platform.version>

<!-- protobuf 插件 -->
<protobuf.plugin.group-id>org.xolstice.maven.plugins</protobuf.plugin.group-id>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>

<!-- protoc 编译器 -->
<protobuf.compiler.artifact-id>protoc</protobuf.compiler.artifact-id>
<protobuf.compiler.group-id>com.google.protobuf</protobuf.compiler.group-id>
<protobuf.compiler.version>${protobuf.platform.version}</protobuf.compiler.version>

<!-- protoc 插件判断平台相关插件 -->
<os.plugin.group-id>kr.motd.maven</os.plugin.group-id>
<os.plugin.version>1.6.2</os.plugin.version>

</properties>

<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>
<!-- pekko 依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-bom_${pekko.platform.scala-version}</artifactId>
<version>${pekko.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- quarkus 依赖 -->
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.platform.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

<!-- 第三方包依赖配置 -->
<dependencies>
<!-- Quarkus 核心依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>

<!-- Quarkus 容器依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>

<!-- Pekko 基础依赖: 推荐采用强类型做消息传递管理 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${pekko.platform.scala-version}</artifactId>
</dependency>

<!-- Pekko 切换为 slf4j 日志 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-slf4j_${pekko.platform.scala-version}</artifactId>
</dependency>

<!-- Pekko 集群协议: 这个功能是作为远程数据交换, 备用依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-remote_${pekko.platform.scala-version}</artifactId>
</dependency>


<!-- Protobuf 核心依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>


<!-- Quarkus 测试依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>

<!-- Pekko 测试依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-testkit-typed_${pekko.platform.scala-version}</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


<!-- 公共编译配置 -->
<build>

<!-- 扩展插件 -->
<extensions>
<!-- 检测提供给 Protobuf 提供系统平台编译插件, 也就是提供 ${os.detected.classifier} 这个平台变量 -->
<extension>
<groupId>${os.plugin.group-id}</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os.plugin.version}</version>
</extension>
</extensions>

<plugins>

<!-- Protobuf 打包生成 Java 绑定插件 -->
<!-- https://www.xolstice.org/protobuf-maven-plugin/compile-mojo.html -->
<plugin>
<groupId>${protobuf.plugin.group-id}</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<!-- Protobuf编译器版本 -->
<protocArtifact>
${protobuf.compiler.group-id}:${protobuf.compiler.artifact-id}:${protobuf.compiler.version}:exe:${os.detected.classifier}
</protocArtifact>

<!-- *.proto 源文件路径, 直接默认即可, 系统事件不需要其他共享 -->
<!-- <protoSourceRoot>${project.parent.basedir}/proto</protoSourceRoot>-->

<!-- 生成Java文件, 直接默认就行, IDEA会自动识别到这部分代码 -->
<!--<outputDirectory>${project.basedir}/src/main/java</outputDirectory>-->

<!-- 是否生成之前清空目录, 最好不要随便乱动 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>

<!-- 执行时机 -->
<executions>
<!-- 执行 mvn compile 的时候打包生成 Java 文件 -->
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- quarkus-maven 插件配置 -->
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>native-image-agent</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- maven 编译配置 -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<parameters>true</parameters>
<source>${maven.compiler.release}</source>
<target>${maven.compiler.release}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>

<!-- 默认日志配置 -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>


<!-- 单元测试设置 -->
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>

</plugins>
</build>

<!-- 公共打包参数 -->
<profiles>

<!-- 生成原生二进制应用 -->
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>

这就是最基础的 单机 Actor 服务, 后续涉及到集群转发之类的业务可以等以后接触, 否则牵涉的知识点太过庞大了.

主要流程是让 Pekko 负责动态构建 Actor 节点, Quarkus 负责将 ActorRef 放入全局容器管理, 消息采用 Protobuf 传递.

这里先构建成默认启动入口, 作为当前项目启动的唯一入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package io.meteorcat.game;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;

/**
* 项目启动入口
* <p>
* 参考文档: <a href="https://cn.quarkus.io/guides/lifecycle">应用程序的初始化和终止</a>
*/
@QuarkusMain
public class PinoApplication {

/**
* 程序启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) {
Quarkus.run(args);
}
}

之后就是准备其他集成措施, 才能将 pekkoquarkus 结合的更加紧密.

配置加载

pekko 的默认加载配置格式是 HOCO(NHuman-Optimized Config Object Notation) 规范, 具体格式如下:

quarkus 则是采用更加通用的 properties/yaml 处理方式, 两者的配置规范没办法统一:

1
2
3
4
5
6
7
8
9
10
11
# Quarkus 配置
quarkus.http.port = 8080
quarkus.log.level = DEBUG

# Pekko 配置
pekko {
loglevel = "DEBUG"
actor {
provider = "local"
}
}

这里就需要需要将 Pekko 配置转化成支持 properties/yaml 加载的方式,
采用 config-reference Quarkus 官方编写处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package io.meteorcat.game.config;

import com.google.protobuf.MessageLite;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
import org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import scala.util.Try;

import java.util.Map;
import java.util.function.Function;

/**
* Pekko Actor 系统的配置器接口,用于标准化 Actor 系统的初始化流程(配置加载、日志适配、自定义参数注入)。
* <p>核心作用:
* 1. 封装 Pekko 配置的加载逻辑(支持多类型默认配置切换);
* 2. 自动适配 Quarkus 日志系统(复用 Slf4j 日志配置,避免多日志系统冲突);
* 3. 支持外部自定义配置注入(通过 settings 传递 Pekko 配置项);
* 4. 提供 Actor 系统创建的默认实现和终止回调扩展。
* <p>使用场景:Quarkus 环境下 Pekko Actor 系统的初始化,需配合 SmallRye Config 实现配置注入(如 @WithDefault 注解)。
*/
@SuppressWarnings("unused")
@ConfigMapping(prefix = "actor")
public interface ActorPropsConfigurator {

/**
* Pekko 配置加载类型枚举,对应 {@link ConfigFactory} 提供的四种默认配置加载方式,用于切换不同场景的基础配置。
* <p>枚举值与 ConfigFactory 方法的映射关系:
* - Reference: 加载 Pekko 框架默认参考配置({@link ConfigFactory#defaultReference()});
* - Application: 加载应用自定义配置({@link ConfigFactory#defaultApplication()});
* - Overrides: 加载覆盖配置(优先级最高,用于临时覆盖默认配置,{@link ConfigFactory#defaultOverrides()});
* - ReferenceUnresolved: 加载未解析的默认参考配置(需手动调用 resolve(),{@link ConfigFactory#defaultReferenceUnresolved()})。
*/
enum ActorReferences {
/**
* 对应 {@link ConfigFactory#defaultReference()},加载 Pekko 框架内置的默认参考配置
*/
Reference,
/**
* 对应 {@link ConfigFactory#defaultApplication()},加载应用级自定义配置(如 application.conf)
*/
Application,
/**
* 对应 {@link ConfigFactory#defaultOverrides()},加载覆盖配置(优先级高于 Reference 和 Application)
*/
Overrides,
/**
* 对应 {@link ConfigFactory#defaultReferenceUnresolved()},加载未解析的默认参考配置(需后续手动解析)
*/
ReferenceUnresolved
}

/**
* 获取 Pekko 配置加载类型(通过 SmallRye Config 注入,支持配置文件指定)。
* <p>默认值为 {@link ActorReferences#Reference},即默认加载 Pekko 框架的默认参考配置;
* 可通过配置文件(如 application.properties)指定:{@code io.fortress.actor.configurator.reference=application} 切换为应用配置。
*
* @return 配置加载类型,非 null
*/
@WithDefault("reference")
ActorReferences reference();


/**
* 获取 Actor 系统的名称(用于创建 Actor 系统实例的唯一标识)。
* <p>最终会传递给 {@link ActorSystem<MessageLite>#create(String, Config)} 的第一个参数,作为 Actor 系统的全局名称;
* 建议通过配置文件注入,确保不同环境(开发/测试/生产)可灵活配置。
*
* @return Actor 系统名称,非空字符串
*/
String name();

/**
* 获取 Pekko 自定义配置集合(用于注入额外的 Pekko 配置项,覆盖默认配置)。
* <p>配置项格式要求:
* - Key: 需符合 Pekko 配置路径(如 "pekko.actor.serializers.proto");
* - Value: 配置值(如 "org.apache.pekko.remote.serialization.ProtobufSerializer");
* <p>使用场景:外部通过配置文件或代码注入 Pekko 特定配置(如序列化器、线程池参数、远程通信配置等)。
*
* @return 自定义配置键值对,可为空(无额外配置时返回空 Map)
*/
Map<String, String> settings();


/**
* 默认实现:创建 Pekko Actor 系统实例(核心初始化方法,封装完整配置流程)。
* <p>初始化流程:
* 1. 加载基础配置:通过 {@link #reference()} 指定的类型加载默认配置;
* 2. 适配 Quarkus 日志:自动注入 Slf4j 日志相关配置,复用 Quarkus 日志系统(避免 Pekko 自带日志与 Quarkus 冲突);
* 3. 合并自定义配置:将 {@link #settings()} 中的配置项合并到基础配置,覆盖默认值;
* 4. 创建 Actor 系统:使用合并后的配置和 {@link #name()} 生成的系统名,创建 ActorSystem 实例。
*
* @return 初始化完成的 Pekko Actor 系统实例,非 null
*/
default ActorSystem<MessageLite> createActorSystem() {

// 1. 加载应用默认基础配置(如 application.conf 中的配置)
Config originalConfig = ConfigFactory.load();


// 2. 根据 reference() 选择的类型,合并对应的 Pekko 默认配置(优先级:Overrides > Application > Reference)
originalConfig = switch (reference()) {
case Application -> originalConfig.withFallback(ConfigFactory.defaultApplication());
case Overrides -> originalConfig.withFallback(ConfigFactory.defaultOverrides());
case ReferenceUnresolved -> originalConfig.withFallback(ConfigFactory.defaultReferenceUnresolved());
// 默认使用 Pekko 框架的默认参考配置
default -> originalConfig.withFallback(ConfigFactory.defaultReference());
};

// 3. 获取外部注入的自定义配置(settings()),适配 Quarkus 日志系统
Map<String, String> settings = settings();
// 若未配置 Pekko 日志过滤器,自动注入 Slf4j 日志相关配置(复用 Quarkus 日志,避免多日志实现冲突)
if (!settings.containsKey("pekko.logging-filter")) {
settings.put("pekko.logging-filter", "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"); // 日志过滤器:使用 Slf4j 实现
settings.put("pekko.loggers.0", "org.apache.pekko.event.slf4j.Slf4jLogger"); // 日志实现:Slf4j logger
settings.put("pekko.log-dead-letters", "off"); // 关闭死信日志(避免开发环境日志冗余)
settings.put("pekko.log-dead-letters-during-shutdown", "off"); // 关闭阶段的死信日志
}


// 4. 合并自定义配置到基础配置(自定义配置优先级最高,覆盖默认配置)
Config mergedConfig = ConfigFactory
.parseMap(settings)
.withFallback(originalConfig)
.resolve();

// 5. 检查是否配置正确的 Pekko 系统名称
if (Objects.isNull(name()) || name().isBlank()) {
throw new IllegalArgumentException("Pekko ActorSystem name cannot be empty! Please configure 'pekko.name'");
}


// 6. 注册 JVM 关闭钩子,保证应用退出时 ActorSystem 优雅关闭
ActorSystem<MessageLite> system = ActorSystem.create(Behaviors.empty(), name(), mergedConfig);
Runtime.getRuntime().addShutdownHook(new Thread(system::terminate));
return system;
}


/**
* 扩展实现:创建 Actor 系统并绑定终止回调(支持在 Actor 系统终止时执行自定义逻辑)。
* <p>适用场景:需要监听 Actor 系统生命周期的场景(如资源释放、日志记录、告警通知等);
* 回调函数会接收 Actor 系统终止的结果({@link Try<Done>}),成功终止时 Try 为 Success,异常时为 Failure。
*
* @param closeable Actor 系统终止时的回调函数,入参为终止结果(Try<Done>),返回值无特定含义(可忽略)
* @param <U> 回调函数返回值类型(泛型,无实际约束)
* @return 初始化完成的 Actor 系统实例(已绑定终止回调),非 null
*/
default <U> ActorSystem<MessageLite> createActorSystem(Function<Try<Done>, U> closeable) {
// 1. 复用默认方法创建 Actor 系统
ActorSystem<MessageLite> actorSystem = createActorSystem();
// 2. 绑定终止回调:当 Actor 系统终止时,执行 closeable 函数(使用 Actor 系统的调度器线程池)
actorSystem.whenTerminated().onComplete(closeable::apply, actorSystem.executionContext());
return actorSystem;
}

}

这里就是 properties 的配置, 也就是现在支持以 quarkus 方式来加载配置:

1
2
3
4
5
6
7
8
9
## Quarkus 的系统配置
quarkus.log.level=INFO
## Pekko 的自定义配置
actor.name=PinoActor
actor.reference=application
# Pekko 内部的 settings 会将后面配置传递给 pekko.* 配置
actor.settings.pekko.actor.provider=local
actor.settings.pekko.actor.serializers.proto=org.apache.pekko.remote.serialization.ProtobufSerializer
actor.settings.settings.pekko.actor.serialization-bindings."com.google.protobuf.Message"=proto

目前还没有初始化 ActorSystem, 只是将 Actor 配置类读取加载, 需依赖 Quarkus@ApplicationScoped 创建全局实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package io.meteorcat.game;

import com.google.protobuf.MessageLite;
import io.meteorcat.game.config.ActorPropsConfigurator;
import io.quarkus.arc.DefaultBean;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 生成全局持久化唯一的 ActorSystem<br>
* '@ApplicationScoped' 会默认将该类实例化并加入容器托管
* 后续通过 '@Inject ActorSystem<MessageLite> system' 就能全局获取到对应的 ActorSystem 句柄
*/
@ApplicationScoped
public class ActorSystemApplication {

/**
* 日志对象
*/
private final Logger logger = LoggerFactory.getLogger(this.getClass());


/**
* 加载 ActorSystem 配置
*/
@Inject
ActorPropsConfigurator configurator;


/**
* 生成全局唯一Actor系统
* DefaultBean 设置为全局默认 ActorSystem 的 Bean
* Startup 代表优先启动加载
* Produces 代表对象生成方法
* ApplicationScoped 代表启动为系统应用级别
*/
@Produces
@Startup
@DefaultBean
@ApplicationScoped
public ActorSystem<MessageLite> actorSystem() {
// 打印配置
logger.info("Creating ActorSystem: {}", configurator.name());
configurator.settings().forEach((key, value) -> logger.info("ActorSystem Setting: {} = {}", key, value));

// 生成 Actor System 并注册系统退出方法, 当 Actor 系统退出的时候要求关闭整个服务
return configurator.createActorSystem((terminatedTry -> {
// 无论成功失败都关闭 Quarkus 系统
logger.info("Close ActorSystem, Status: {}", terminatedTry.isSuccess() ? "Success" : "Failure");
Quarkus.asyncExit();
return Done.getInstance();
}));
}
}

这样启动之后 Quarkus 就会帮助构建全局唯一的 ActorSystem, 后续就是加入 Actor 节点并且交换数据.

启动服务

如果对 Actor 开发没什么具体概念, 可以先学习下 skynet 及其文档, 方便知道后续怎么继续下去

skynet 开发过程, 默认每个客户端连接到服务器都会在服务端被动态生成 agent 做服务端代理请求.

在设计 agent 之前还需要处理让 Quarkus 全局依赖注入到 pekko 内部之中, 这里将其设计成全局的接口类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package io.meteorcat.game.utils;


import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.util.Optional;

/**
* Bean容器注入
* 对 Actor 实例化的句柄注入 Quarkus 的全局实例化对象
* 需要的该功能的 Actor 只需要组合继承即可
*/
public interface ActorBeanContainer {

/**
* 日志对象
*/
Logger LOG = LoggerFactory.getLogger(ActorBeanContainer.class);

/**
* 通过反射为 Actor 注入 @Inject 注解的字段(兼容 CDI 上下文 + 异常安全)
*
* @param actor Pekko Actor 实例
* @param <T> Actor 消息类型
* @throws IllegalAccessException 反射访问字段失败时抛出
*/
default <T> void injectFields(AbstractBehavior<T> actor) throws IllegalAccessException {
final ArcContainer arcContainer = Arc.container();
final BeanManager beanManager = arcContainer.beanManager();
final Instance<Object> cdiInstance = beanManager.createInstance();

// 递归扫描 Actor 类继承链的所有字段
Class<?> currentClass = actor.getClass();
while (currentClass != Object.class && currentClass != AbstractBehavior.class) {
Field[] fields = currentClass.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(Inject.class)) {
injectSingleField(actor, field, cdiInstance);
}
}
currentClass = currentClass.getSuperclass();
}
}

/**
* 注入单个 @Inject 字段(异常安全 + 类型匹配)
*
* @param actor Actor 实例
* @param field 待注入字段
* @param cdiInstance CDI Instance 实例
* @throws IllegalAccessException 反射赋值失败
*/
private <T> void injectSingleField(AbstractBehavior<T> actor, Field field, Instance<Object> cdiInstance) throws IllegalAccessException {
Class<?> fieldType = field.getType();
field.setAccessible(true);

// 安全获取 Bean:避免找不到 Bean 导致崩溃
Optional<Object> dependency = Optional.ofNullable(
cdiInstance.select(fieldType).get()
);

if (dependency.isPresent()) {
field.set(actor, dependency.get());
LOG.debug("Successfully injected field[{}] of type[{}] into Actor[{}]", field.getName(), fieldType.getName(), actor.getClass().getName());
} else {
LOG.warn("CDI Bean for field[{}] (type: {}) not found, skipping injection", field.getName(), fieldType.getName());
}
}
}

之后就是准备涉及具体的 AgentActor 类, 先编写一些简单的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package io.meteorcat.game.actor;

import com.google.protobuf.MessageLite;
import io.meteorcat.game.config.ActorPropsConfigurator;
import io.meteorcat.game.utils.ActorBeanContainer;
import jakarta.inject.Inject;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.PostStop;
import org.apache.pekko.actor.typed.Terminated;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.slf4j.Logger;

import java.util.Objects;

/**
* 客户端在服务端之中的 '操作代理'
* 客户端通过 TCP/UDP/WebSocket 等连接上来的会被 ActorSystem 动态构建单独 Actor
* 所以消息转发过滤都是经由此 Actor 检测过滤最终确认是转发还是执行本地逻辑
*/
public class AgentActor extends AbstractBehavior<MessageLite> implements ActorBeanContainer {

/**
* 日志对象
*/
final Logger logger = getContext().getLog();

/**
* 确认是否能够接收到被注入的 Actor 全局配置
* 如果成功代表 Quarkus 已经被顺利托管进来
*/
@Inject
ActorPropsConfigurator configurator;

/**
* 默认构造方法
*/
private AgentActor(ActorContext<MessageLite> context) {
super(context);
}

/**
* 动态构建
*/
public static Behavior<MessageLite> create() {
return Behaviors.setup((ctx) -> {
AgentActor agent = new AgentActor(ctx);
agent.injectFields(agent); // 自己做依赖注册
return agent;
});
}

/**
* 消息拦截
*/
@Override
public Receive<MessageLite> createReceive() {
// 构建信号拦截
return newReceiveBuilder()
.onSignal(PostStop.class, (ignore) -> {
logger.info("Actor Stop!");

// 退出的时候打印注入配置
if (Objects.isNull(configurator)) {
logger.warn("Actor configurator is null!");
} else {
logger.warn("Actor configurator has been initialized!");
}

return Behaviors.same();
})
.onSignal(Terminated.class, (ignore) -> {
logger.info("Actor Terminated!");
return Behaviors.same();
})
.build();
}
}

之后就是准备测试运行下功能, 这里注意在创建以下测试单元文件:

  • src/test/resources/application.properties: 测试单元的配置文件, 这个文件内容和原来 application.properties 保持一致即可

  • src/test/java/io/meteorcat/game/actor/AgentActorTest.java: 具体测试的功能类, 这里就是编写具体的测试单元的文件

测试单元如下, 关于 TestKit 开发测试包可以网上查询下文档, 这里仅仅做防止退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package io.meteorcat.game.actor;

import com.google.protobuf.MessageLite;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.PostStop;
import org.apache.pekko.actor.typed.Props;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.*;

/**
* AgentActor 测试单元入口
*/
@QuarkusTest
class AgentActorTest {


/**
* 获取全局挂载的 ActorSystem
*/
@Inject
ActorSystem<MessageLite> system;


/**
* 确认 Actor 名称
*/
@Test
void testSystemInfo() {
assertNotNull(system);
final Logger logger = system.log();
logger.info("Actor System Name: {}", system.name());
}


/**
* 测试启动 AgentActor
*/
@Test
void testStartAgentActor() {
assertNotNull(system);
TestKit testKit = new TestKit(system.classicSystem());

// 创建 Actor
String name = "agent-%s".formatted(UUID.randomUUID().toString());
ActorRef<MessageLite> actorRef = system.systemActorOf(AgentActor.create(), name, Props.empty());

// 延迟任务, 让 actor 退出
system.scheduler().scheduleOnce(
Duration.ofSeconds(2),
() -> actorRef.unsafeUpcast().tell(PostStop.instance()),
system.executionContext()
);

// 等待3s 让 Actor 反映之后退出
testKit.expectNoMessage(Duration.ofSeconds(3));
}

}

最后就会输出单元测试运行之后的内容, 主要关心 Actor configurator has been initialized! 是否初始化成功:

1
2
3
4
5
6
2025-12-09 09:02:48,584 INFO  [org.apa.pek.act.typ.ActorSystem] (main) Actor System Name: PinoTestActor
2025-12-09 09:02:48,584 FINE [org.jun.jup.eng.exe.InvocationInterceptorChain$ValidatingInvocation] (main) The invocation is skipped
2025-12-09 09:02:48,594 DEBUG [io.met.gam.uti.ActorBeanContainer] (PinoTestActor-pekko.actor.default-dispatcher-4) Successfully injected field[configurator] of type[io.meteorcat.game.config.ActorPropsConfigurator] into Actor[io.meteorcat.game.actor.AgentActor]
2025-12-09 09:02:50,613 INFO [io.met.gam.act.AgentActor] (PinoTestActor-pekko.actor.default-dispatcher-4) Actor Stop!
2025-12-09 09:02:50,613 WARN [io.met.gam.act.AgentActor] (PinoTestActor-pekko.actor.default-dispatcher-4) Actor configurator has been initialized!
2025-12-09 09:02:51,593 FINE [org.jun.jup.eng.exe.InvocationInterceptorChain$ValidatingInvocation] (main) The invocation is skipped

可以看到已经初始化成功了, 也就是代表容器依赖也被注入到内部, 至此已经完成 pekko + quarkus 的集成, 后续就是具体的业务编写服务.