Pekko 异步运行时

早期 Java 的并没有统一的异步运行时, 所以很多都需要去自行实现处理, 而 Pekko 则是自行设计通用的异步运行时.

Pekko 异步运行时是由 Actor 模型 + 调度器 + 流处理 + 未来式(Future) 构成的完整异步运行体系, 包含有以下组件:

  • Scheduler/Dispatcher: 任务定时器/调度器, 底层执行核心, 负责线程分配和任务调度, 隔离不同类型的任务

  • Future/CompletionStage: 异步任务结果包装, 处理异步任务的返回值,避免同步阻塞等待

  • Stream: 支持运行时的数据流处理, 解决批量数据的异步生产 - 消费问题, 内置背压避免流量失控

  • Actor: 最上层的任务载体, 用 “消息队列 + 单线程执行” 的模式将并发问题收敛到"消息顺序"而非"锁"

这些组件都是能够独立使用

在 Java8 之后可以依靠 Runnable/Callable/ThreadPool 实现简单的异步运行时, 但是对比 Pekko 纯正 Actor 设计差距很大:

特性 Pekko 异步运行时 Java 原生异步(Java 8+)
核心模型 Actor + 消息驱动 + 流处理 CompletableFuture + 线程池 + NIO
分布式支持 天然支持(Pekko Cluster) 无原生支持,需手动实现
背压机制 内置(Stream 模块) 无,需手动实现(如 BlockingQueue 阻塞)
线程安全 单 Actor 单线程,无需锁 需手动加锁/使用并发容器
异常隔离 Actor 故障域隔离,单个 Actor 异常不扩散 线程池任务异常可能导致线程终止
编程复杂度 较高(需理解 Actor/流) 较低,但复杂场景(如分布式)需大量封装

手写异步任务对于锁的处理一定要小心, 否则死锁阻塞任务的问题会很频繁

这里需要说明 Pekko 异步运行时相关的函数方法, 这些都是需要被调度器来唤醒执行:

  • AskPattern.ask(): Actor 异步交互的核心方法,替代 同步调用 + 锁, 需指定超时时间避免无限等待

  • thenApply()/thenAccept()/exceptionally(): 异步结果的转换、消费、异常处理,全程非阻塞

  • CompletableFuture.allOf()/anyOf():多 Future 并行组合, 适合运行批量异步任务

需要去参阅官方文档说明:

Patterns 针对经典 Actor 设计, Typed Actor 需用 AskPattern, 这是个很重要的接口, 涵盖以下功能和场景:

方法 核心作用 典型场景
ask 向 Actor 异步发请求并获取结果 查询数据、调用需要返回值的 Actor 方法
pipe 将 Future 结果自动转发给 Actor 异步任务结果投递到其他 Actor 处理
after 延迟执行异步操作 超时兜底、定时触发
retry 自动重试失败的异步操作 不稳定服务的重试(网络/数据库故障)

要驱动 Pekko Actor 去执行任务则是依赖 Scheduler 来进行调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Pekko 定时调度 Actor, 最好把任务设计成 Actor 投递, 结合 Actor 做业务调度, 而不要直接调用内部的任何函数和方法
*/
public static class PekkoSchedule extends AbstractActor {

/**
* Actor 定时器
*/
final Scheduler scheduler = getContext().getSystem().getScheduler();

/**
* Actor 调度器
*/
final ExecutionContextExecutor dispatcher = getContext().getDispatcher();

/**
* 延迟 2 秒执行任务
*/
final Cancellable onceTask = scheduler.scheduleOnce(
Duration.ofSeconds(2), // 延迟时间
() -> getSelf().tell("Hello.World", ActorRef.noSender()), // 执行任务
dispatcher // 绑定的调度器
);


/**
* 周期性执行任务
*/
final Cancellable fixedRateTask = scheduler.scheduleAtFixedRate(
Duration.ofSeconds(3), // 初始延迟
Duration.ofSeconds(3), // 执行间隔
() -> getSelf().tell("Hello.EachTask", ActorRef.noSender()), // 执行任务
dispatcher // 绑定的调度器
);


/**
* 退出 Actor 回调
*/
@Override
public void postStop() {
// 取消定时任务
onceTask.cancel();
fixedRateTask.cancel();
}
}

定时调度器一般提供给类似游戏服务端挂载定时调用功能, 比如 角色|怪物每秒移动 / 金币|钻石等资源的同步增长 / 每日跨天更新通知

而 Patterns 不需要直接延迟调度, 只是需要做类似于 请求 - 响应 这种强状态相关的调度, 类似下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Pekko 请求-响应调度 Actor
*/
public static class PekkoPatterns extends AbstractActor {

/**
* 定义传输数据结构
*
* @param values
*/
record AskSum(int[] values) {

}

/**
* 启动 Actor 回调
*/
@Override
public void preStart() {
// 设计请求: 要求计算 int[] 的合计总额, 5秒获取结果否则超时弹出异常
// Patterns.ask 响应结果只能返回 CompletionStage<Object>, 内部类型需要自行去匹配推断
CompletionStage<Object> future = Patterns.ask(getSelf(), new AskSum(new int[]{10, 20, 30}), Duration.ofSeconds(5));

// 注册异步调用
future.thenApply(res -> {
// 过滤判断结果
if (res instanceof Integer v) {
return v;
} else {
throw new IllegalArgumentException("合计总额参数异常");
}
}).thenAccept(res -> {
System.out.printf("最后得结果: %d%n", res);
}).exceptionally(throwable -> {
// 拦截异步调用过程之中的异常信息
System.err.println(throwable.getMessage());
return null; // 这里返回 null 代表外部无需捕获最后调用值, 指代 Void
});

}

/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.matchk(AskSum.class, req -> {
final ActorRef sender = getSender(); // 获取请求过来的响应地址

// 将计算结果响应返回过去, 如果没有返回会导致外层异步超时失败
sender.tell(
Arrays.stream(req.values()).sum(),
ActorRef.noSender()
);
}).build();
}
}

这种方式是最常用的, 涉及到强关联的数据就必须要做到 要么直接成功获取结果, 要么直接失败抛出外层异常, 绝对不要有任何中间意外因素.

需要注意: Patterns 会占用 Promise 等资源等待响应, 所以才需要超时处理功能, 应该避免使用 Patterns 防止 Actor 处理缓慢

如果没有数据强制上下文一致性的情况, 将 Patterns 改为自定义回调消息然后拦截更好点, 而内置的 CompletionStage 功能基本如下:

1. 单阶段任务后续操作(触发条件:当前阶段正常完成)

这类方法以 then 为前缀,用于当前异步阶段完成后执行下一个操作,又细分为三类:

方法前缀 核心逻辑 代表方法 作用
thenApply 转换结果(有入参有返回) thenApply/thenApplyAsync 接收当前阶段结果,通过 Function 转换为新结果,返回新的 CompletionStage
thenAccept 消费结果(有入参无返回) thenAccept/thenAcceptAsync 接收当前阶段结果,通过 Consumer 消费(如打印、存储),返回 Void 类型阶段
thenRun 执行动作(无入参无返回) thenRun/thenRunAsync 不接收结果,仅执行 Runnable 动作,返回 Void 类型阶段

异步变体: 每个方法都有 Async 后缀版本(默认线程池)和 Async(Executor) 版本(自定义线程池),用于指定异步执行方式

2. 双阶段任务并联操作(触发条件:两个阶段都完成)

这类方法用于等待当前阶段 + 另一个阶段都正常完成后执行操作:

方法前缀 核心逻辑 代表方法 作用
thenCombine 合并结果(有入参有返回) thenCombine/thenCombineAsync 接收两个阶段的结果,通过 BiFunction 合并为新结果
thenAcceptBoth 消费双结果(有入参无返回) thenAcceptBoth/thenAcceptBothAsync 接收两个阶段的结果,通过 BiConsumer 消费
runAfterBoth 执行动作(无入参无返回) runAfterBoth/runAfterBothAsync 两个阶段都完成后,执行 Runnable 动作

3. 双阶段任务选路操作(触发条件:任一阶段完成)

这类方法用于等待当前阶段 或 另一个阶段任一正常完成后执行操作:

方法前缀 核心逻辑 代表方法 作用
applyToEither 转换结果(有入参有返回) applyToEither/applyToEitherAsync 接收先完成阶段的结果,通过 Function 转换为新结果
acceptEither 消费结果(有入参无返回) acceptEither/acceptEitherAsync 接收先完成阶段的结果,通过 Consumer 消费
runAfterEither 执行动作(无入参无返回) runAfterEither/runAfterEitherAsync 任一阶段完成后,执行 Runnable 动作

4. 阶段嵌套(组合多个异步流程)

方法名 核心逻辑 作用
thenCompose 接收当前阶段结果,返回一个新的 CompletionStage(而非直接返回结果) 解决 thenApply 嵌套 CompletionStage 导致的「嵌套异步」问题,实现异步流程的扁平化串联
thenComposeAsync thenCompose 的异步版本 同上,异步执行 Function(返回 CompletionStage 的函数)

5. 异常处理与全量结果处理

这类方法不区分阶段是否正常完成, 专门处理异常或全量结果:

方法名 核心逻辑 作用
handle 接收「结果 + 异常」(任一为 null),通过 BiFunction 处理并返回新结果 统一处理正常/异常场景,可转换异常为正常结果
whenComplete 接收「结果 + 异常」,通过 BiConsumer 消费(不改变结果) 仅监听完成状态(正常/异常),不修改原结果,适合日志、监控等场景
exceptionally 仅当阶段异常完成时执行,通过 Function 将异常转换为正常结果 类似 try-catch 中的 catch 块,专门处理异常
exceptionallyCompose 异常时返回新的 CompletionStage(异步版异常处理) Java 12 新增,支持异常场景下的异步流程编排

6. 工具方法

方法名 核心逻辑 作用
toCompletableFuture 转换为 CompletableFuture 实现不同 CompletionStage 实现类的互通,对接 CompletableFuture 的扩展功能

日常只需要按照数据处理流程来选择不同的异步任务处理, 而常规的函数方法任务也可以直接包装成异步任务让渡给系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Pekko 自定义调度 CompletableFuture
*/
public static class PekkoCompletableFuture extends AbstractActor {
/**
* Actor 调度器
*/
final ExecutionContextExecutor dispatcher = getContext().getDispatcher();

/**
* 启动 Actor 回调
*/
@Override
public void preStart() {
// 注意: 这里仅仅是参考, Actor 内部任务调度一定要让内部自行调度, 而不要手动采用 dispatcher 调度
// runAsync 第二个参数支持 Java 内部 Executor 线程执行器
CompletableFuture.runAsync(() -> {
System.out.println("Hello.World");
}, dispatcher)
.orTimeout(3, TimeUnit.SECONDS) // 设置异步超时
.thenAccept(ignore -> {
System.out.println("执行完成");
}).exceptionally(throwable -> {
System.err.printf("执行异常:%s%n", throwable.toString());
return null;
});
}
}

自行采用 ExecutionContextExecutor 来创建调度任务会打乱 Pekko 内部的线程调度引发线程安全的问题, 所以不推荐采用这样直接调度处理.

如果需采取这种任务调度方法, 需要单独创建线程池加上消息管理传递, 而非使用采用 Actor 任务线程池, 具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Pekko CompletableFuture 投递 Actor
*/
public static class PekkoCompletableFuture extends AbstractActor {
/**
* Actor 调度器
*/
final ExecutionContextExecutor dispatcher = getContext().getDispatcher();

/**
* 构建独立的线程池而非 Actor 的 dispatcher 来执行异步任务, 避免挤占消息处理资源
*/
final ExecutorService executor = Executors.newFixedThreadPool(3);


/**
* 启动 Actor 回调
*/
@Override
public void preStart() {
// 创建自定义异步任务, 将调度器设置为自定义线程池上
CompletableFuture<Long> task = CompletableFuture.supplyAsync(() -> {
System.out.println("执行完成");
return System.currentTimeMillis();
}, executor)
.orTimeout(3, TimeUnit.SECONDS) // 设置异步超时
.exceptionally(throwable -> {
System.err.printf("执行异常:%s%n", throwable.toString());

// 这里也可以实现任务充实机制
// 首次任务失败可以再次运行一次
return CompletableFuture.supplyAsync(() -> {
System.out.println("任务重试");
return System.currentTimeMillis();
}, executor)
.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(throwable1 -> {
// 重试任务还是失败, 直接可以设置异常消息通知
System.err.printf("重试异常:%s%n", throwable1.toString());
return null;
})
.join();
});

// 将异步结果 pipe 到 Actor 自身的消息队列, 也就是发送到 Match 消息拦截之中
// 这种就是通过自定义线程池来调度异步任务, 然后获取结果之后 actorRef.tell(msg) 将结果投递到 Actor
Patterns.pipe(task, dispatcher).to(getSelf());
}
}

默认 Actor 是基于单线程来运行逻辑, 通过 Executors + Patterns.pipe 可以让其拥有更强的线程任务调度调度能力.

这种就是异步流程就是官方推荐的 “结果管道化(CompletableFuture Pipe)”

比如计算比较复杂且耗时的任务可以利用 allOf/anyOf 批量提交给指定线程池运行, 之后通过 Patterns.pipe 投递消息结果即可.