Pekko Stream 使用

Pekko Stream 是一套异步、非阻塞、支持背压(Backpressure)的数据流处理框架,
主要用来解决大数据/高并发场景下的资源管控问题, 避免数据加载时候的 OOM 问题.

pekko 需要依赖以下的组件(采用 Maven 来包管理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<!-- 所需依赖 -->
<dependencies>
<!-- Pekko Stream 核心依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- Stream 测试依赖(单元测试用) -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream-testkit_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
<scope>test</scope>
</dependency>

<!-- 标准的 Actor 依赖 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor-typed_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- SLF4J 桥接依赖, 必须添加 -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-slf4j_${scala.binary.version}</artifactId>
<version>${pekko.version}</version>
</dependency>

<!-- slf4j 日志API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>

<!-- SLF4J 简单实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
<scope>runtime</scope>
</dependency>
</dependencies>

Stream 把所有数据处理抽象为以下基础构件:

构件 底层职责
Source 生产数据(从文件、网络、Actor、队列等读取),管理数据生成的异步线程/资源
Sink 消费数据(写入文件、发送网络请求、投递到 Actor、打印日志),处理消费后的资源释放
Flow 中转处理数据(过滤、转换、分帧、加解密),串联 Source 和 Sink 形成完整管道

比较典型的 PekkoStream 应用场景如下:

  • WebSocket 长连接:客户端和服务端的双向数据流动(Source=服务端推消息,Sink=接收客户端消息)

  • HTTP 流式响应:比如下载大文件(Source 读取文件块,流式返回给客户端,避免一次性加载到内存)

  • TCP/UDP 服务:Pekko TCP 底层基于 Stream 实现,处理高并发的 TCP 连接(比如游戏服务端的长连接)

  • 文件处理:Pekko 支持集成内部文件处理包装成异步, 可以作为日志分析、数据清洗、格式转换, 处理 GB 级文件时避免 OOM

应用场景 核心优势 核心API/工具
WebSocket 长连接 双向背压、连接隔离、Actor整合 Flow.fromSinkAndSource
HTTP 流式响应 内存可控、异步IO、断点续传 FileIO.fromPathHttpEntity
TCP 服务 高并发、灵活解包、连接管控 Tcp.bindFraming
文件处理 异步读写、流式转换、结果聚合 FileIOSource.fromInputStreamFraming.delimiter

在大数据的情境下, 采用 Stream 处理效率极其优异, 能够做到资源占用和运行效率的平衡.

注意: Pekko Stream 和 Actor 是 "互补关系", Stream 负责批量/流式数据处理, Actor 负责状态管理/业务逻辑

这部分需要比较强的设计功底, 需要了解底层的处理方式才能结合起来使用, 所以必须深入理解 Pekko 的 Stream 概念.

简易流处理

这里模拟将 System.out/System.in 包装成 Source(System.out)/Sink(System.in) 来加深理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import org.apache.pekko.Done;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.util.ByteString;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletionStage;

/**
* Pekko Stream 建议交互
*/
public class PekkoStreamExample {

/**
* 启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) {

// 创建 ActorSystem(Pekko Stream 的运行时依赖)
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SystemIO");

// 获取 Stream 执行器 Materializer
// Materializer 可以复用, 如果是频繁需要异步调用最好实例化句柄持有
Materializer materializer = Materializer.createMaterializer(system);

// 定义 Source: 包装 System.in 为数据生产者,按行读取
InputStream inputStream = System.in;

// 只要支持 Java 的 InputStream 的对象都可自己包装成 Source
// 用 StreamConverters 包装 System.in(InputStream)
// 建议观察下 StreamConverters 内部, 其中内部实现大量流转化工具
Source<ByteString, ?> inputSource = StreamConverters.fromInputStream(
() -> inputStream, // 延迟提供 InputStream 实例
8192 // 读取缓冲区大小,按需调整(平衡性能和内存占用)
)
// 行分割 + 背压控制: 限制每行最大 1024 字节
.via(Framing.delimiter(
ByteString.fromString("\n"),
1024,
FramingTruncation.ALLOW
))
// 配置输入缓冲区, 强化背压(缓冲区满则暂停读取)
.withAttributes(Attributes.inputBuffer(1, 1));


// 中间处理流:过滤空行
Flow<String, String, ?> processFlow = Flow.of(String.class)
.filter(line -> !line.trim().isEmpty())
.map((line) -> {
// 这里可以中间处理下, 比如在输出的时候追加特殊符号
return "Robot: %s".formatted(line);
});


// 创建共享的退出指令, 用于手动优雅退出数据流
SharedKillSwitch killSwitch = KillSwitches.shared("quit-signal");


// 输出 Sink: 打印到控制台
// 定义输入处理进程流的处理
Sink<String, CompletionStage<Done>> outputSink = Sink.foreach(
line -> {
// 检测是否匹配 >>> quit 指令, 如果检测到就执行退出逻辑
if ("Robot: quit".equalsIgnoreCase(line)) {
killSwitch.shutdown(); // 主动关闭流
return;
}
// 照常输出
System.out.println(line);
}
);


System.out.println("数据回显程序已启动");
System.out.println("输入 quit 即可退出程序\n");


// 构建数据流管道并启动
CompletionStage<Done> done = inputSource
// 字节流转字符串
.map(byteString -> byteString.decodeString(StandardCharsets.UTF_8))
.via(killSwitch.flow()) // 将 KillSwitch 接入流监听
.via(processFlow)
// Keep.right() 保留右侧 Sink 的 materialized 值
.toMat(outputSink, Keep.right())
.run(materializer);

// 监听流结束/异常,释放资源
done.whenComplete((result, exception) -> {
if (Objects.nonNull(exception)) {
System.err.println("数据流异常终止: " + exception.getMessage());
} else {
System.out.println("数据流正常结束");
}
system.terminate();
});
}
}

这里的 map 是 pekko 自己实现类似 Java 系统库的 Stream 处理, 需要认识以下关键功能类:

  • Materializer: 流的抽象运行时, 必须要获取到该运行时才能将流处理任务推送到 ActorSystem 执行

  • CompletionStage: 异步任务结果的封装, 用于监听流执行最后的结果响应, 常用 Done(正常完成)/Exception(异常)获取自定义类型

  • SharedKillSwitch: 流的主动终止信号, 用来主动处理流生命周期, 但是流处理中断操作必须谨慎

物化值(Materialized Value) 是流处理的关键, 必须了解才能引出流交换过程暴露的资源/控制句柄, 也就是拦截获取流任务处理任务

比较通俗可以比喻 Pekko Stream 的拓扑 Source(输出) → Flow(管道) → Sink(输入) 当作现实的流水生产线:

  • 产出物品: 生产线上被加工的原材料(比如字符串、二进制数据等)

  • 物化值: 启动生产线后, 拿到的控制工具/资源句柄(比如电源开关、原材料仓库钥匙、生产进度看板)

  • 在这个过程之中, 就可以针对物化值自主获取到需要的 Source(输出材料)/Sink(输入材料)/NotUsed(都不需要)/Both(全都需要)

然后就是关于 Keep.none/both/right/left, 这就是最后调控流执行的时候获取相关物料:

常量 作用 适用场景 调用后返回类型
Keep.none() 不保留任何物化值 不需要监听流状态、不需要控制流 NotUsed
Keep.left() 保留左侧构件的物化值 需获取 Source 的资源句柄(如 TCP 端口) Source 的物化值类型
Keep.right() 保留右侧构件的物化值 需获取 Sink 的结果(如 CompletionStage<Done> Sink 的物化值类型
Keep.both() 保留左右两侧的物化值 需同时控制 Source 和 Sink(如监听结果+持有连接) Pair<左物化值, 右物化值>

Keep.* 相关值需要配合 XXX.toMat(xxx,{物料}) 来调用, 而 XXX.to(xxx) 等价于 XXX.toMat(xxx, Keep.none()),返回 NotUsed

一般来说运行 Flow 流过程之中需要指定 Materializer 来运转, 这需要 Keep.* 来提取并且指定 Sink/Source 运行时来执行流

构建流都需要传入 *.run(Materializer) 来做运行时执行处理, 测试例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
// 1. 创建唯一的运行时 Materializer
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "demo");
Materializer materializer = Materializer.createMaterializer(system);

// 2. 定义固定拓扑
Source<String, ?> source = Source.single("hello pekko stream");
Sink<String, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

// 3. 同一运行时,不同 Keep 选型(只改变物化值,不改变运行时)
// 选型1:Keep.right() → 拿 Sink 的物化值(监听结果)
CompletionStage<Done> sinkMat = source.toMat(sink, Keep.right()).run(materializer);

// 选型2:Keep.left() → 拿 Source 的物化值(这里 Source 物化值是 NotUsed, 其实也是类似的 Void)
Object sourceMat = source.toMat(sink, Keep.left()).run(materializer);

// 选型3:Keep.both() → 同时拿两个物化值
var bothMat = source.toMat(sink, Keep.both()).run(materializer);

// 4. 监听结果(验证运行时生效)
sinkMat.whenComplete((done, ex) -> {
System.out.println("流执行完成");
system.terminate();
});
}

另外 Flow 就是关键的运行核心, 这里标识下这个类型的参数签名 Flow<输入类型:In, 输出类型:Out, 物料:Mat>

之后就是引入这个极具设计感的 图(Graph) 核心设计

Graph 设计

Graph 是数学当中的模型, Pekko 的流处理借用这种概念, 涉及到数据结构和算法相关, 如果只做开发应用可以略过这章节

Graph 是 Pekko 核心设计(借用数学分支), 所有的 Source、Flow、Sink, 以及它们组合成的复杂数据流拓扑, 本质上都是 Graph 具体实现.

首先查看下 Flow<输入类型:In, 输出类型:Out, 物料:Mat> 内部源码定义:

1
2
3
4
5
6
7
8
9
/**
* 这里引出了 pekko 的 Graph 概念
* @param <In> 流输入产物
* @param <Out> 流输出产物
* @param <Mat> 物料值
*/
public final class Flow<In, Out, Mat> implements Graph<FlowShape<In, Out>, Mat> {
// 其他略
}

Graph 在数学当中主要用于描述离散对象之间关系的抽象数据结构, 其核心作用是将复杂的 对象 - 关系 场景建模成以下元素:

  • 节点(Vertex/Node)

  • 边(Edge)

  • 通过数学方法分析多对多关联从而解决问题

这部分抽象出来的定义的场景应用方向如下:

  1. 网络拓扑 - 计算机网络中的主机(节点)和网线(边), 社交网络中的用户(节点)和好友关系(边)

  2. 路径规划 - 城市(节点)和公路(边)、地图中的路口(节点)和道路(边)

  3. 依赖关系 - 项目中的任务(节点)和任务间的先后依赖(边)、代码中的函数(节点)和调用关系(边)

如果没有图结构, 这类 多对多 关系很难高效表达, 所以这种网状交叉模型就被引入到计算机科学当中.

Graph 最著名的应用场景就是判断两个节点(Vertex/Node)之间是否可达? 可达的最短路径是哪条?

而 Graph 的分类维度可以按照以下分类:

  • 按边的方向性划分

    • 无向图 - 边没有方向, 节点之间的关系是双向的(社交网络的 “好友关系”, 城市之中的 “道路连通”)
    • 有向图 - 边有方向, 节点之间的关系是单向的(代码的 “函数调用关系”, 任务的 “依赖顺序”)
  • 按边的权重属性划分

    • 无权图 - 边没有附加属性, 仅表示 “是否连接”
    • 加权图 - 边带有权重值, 用于表示关系的强度/成本/容量

这两个维度是后续选择图算法(比如最短路径、拓扑排序)的关键依据, 边的方向性可以和权重做组合:

图的分类 核心适用算法 算法解决的核心问题 典型工程场景
无向无权图 BFS/DFS 两点是否可达、最短路径(步数) 社交网络“好友链”查找、网络连通性检测
无向加权图 Dijkstra、Prim(最小生成树) 最短路径(成本)、最小连通成本 地图导航(无单行道)、电网布线优化
有向无权图 BFS/DFS、拓扑排序(Kahn) 依赖顺序、单向可达性 代码编译依赖、任务调度
有向加权图 Dijkstra、A*、Bellman-Ford 带成本的单向最短路径 地图导航(有单行道)、网络路由
有向无环图 拓扑排序、动态规划 依赖调度、路径权重累加 流水线任务编排、依赖包版本解析

回过头来说 Pekko Stream, 里面就是采用 Source → Flow → Sink 来实现流传递, 本质是工程化的有向无环图

接下来就是按照 Graph 理解这些代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

/**
* 空节点, 以下会直接跳出错误
* <p>
* 这里只有节点而没有边, 起点节点(Source)没有边连接到任何终点节点(Sink), 属于 "不可达的孤立节点", 拓扑无效
*/
void emptyVertex() {
// Source 没有连接到 Sink,拓扑不闭合
Source<String, NotUsed> source = Source.single("hello");
// 没有调用 to()/via() 连接 Sink,运行时会提示“拓扑未闭合”
source.run(materializer); // 编译/运行报错
}


/**
* 匹配错误
* <p>
* 边的两端节点 "数据类型不兼容", 相当于数学图中 "边连接了两个不匹配的节点"
*/
void matchError() {
// 错误示例:Flow 输入是 String,Source 输出是 Integer,类型不匹配
Source<Integer, NotUsed> source = Source.single(123);
Flow<String, String, NotUsed> flow = Flow.of(String.class).map(s -> s.toUpperCase());
source.via(flow).to(Sink.foreach(System.out::println)); // 编译报错
}

这就是 Pekko 将数学当中 Graph 引入计算机当中的实践, 这里只需要了解一下就行了, 后续如果要深入的话需要更多扩展资料.

数据通道

上面常常看到频繁 Source → Flow → Sink 说明的数据流向, 但是现实当中还需要数据输入输出可能是分开处理(异步读取/写入).

而 PekkoStream 考虑到这一点, 所以也支持拆分数据到单向空节点之中, 也就是拆分:

  • inbound: 入站数据

  • outbound: 出站数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.util.ByteString;
import org.slf4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

public class PekkoStreamQueueExample {

/**
* 功能入口
*/
public static void main(String[] args) throws IOException {
// 创建唯一的运行时 Materializer
final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "demo");
final Materializer materializer = Materializer.createMaterializer(system);// 可以持有复用
final Logger logger = system.log();


// 入站数据处理 START -------------------------------------------------------------------------------

// 创建入站消息通道, 第二个参数代表队列满了的策略
// backpressure(): 自动回滚覆盖数据
// dropTail(): 删除队列尾数据
// dropHead(): 删除队列头数据
SourceQueueWithComplete<ByteString> inbound = Source.<ByteString>queue(
1024, // 队列长度
OverflowStrategy.backpressure() // 队列满了之后的策略, 背压策略会回滚最早消息覆盖
).via(Framing.delimiter( // 这里也可以采用 Framing.lengthField 方式
ByteString.fromString("\n"),// 数据分隔符
1024, // 最大缓冲区
FramingTruncation.ALLOW // 指定当输入数据流被截断无法处理时候策略: DISALLOW - 中断处理/ ALLOW - 直接丢弃
)).to(Sink.foreach(message -> {
// 捕获数据流转, 这里就可以考虑转发给特定 Actor
// 比如 getSelf().tell(new FrameReceived(frame), getSelf());
logger.info("Message received: {}", message);

})).run(materializer); // 挂载运行时


// 然后外部的 CMD/TCP/UDP/WebSocket 转发服务端数据就直接写入队列
// 下面的数据因为没有分隔符直接被过滤跳过
inbound.offer(ByteString.fromString("Hello World! ERROR!")) // 这里就是将数据推入队列
.whenComplete((res, throwable) -> {
if (Objects.nonNull(throwable)) {
logger.error("push queue error", throwable); // 写入队列失败
} else {
logger.info("push queue success"); // 写入队列成功
}
});

// 这里追加 \n 符号用于分隔, 这条数据才会被捕获
inbound.offer(ByteString.fromString("Hello World!Hello!!!\n")) // 这里就是将数据推入队列
.whenComplete((res, throwable) -> {
if (Objects.nonNull(throwable)) {
logger.error("push queue error", throwable); // 写入队列失败
} else {
logger.info("push queue success"); // 写入队列成功
}
});


// 入站数据处理 END -------------------------------------------------------------------------------


// 出站数据处理 START -------------------------------------------------------------------------------

// 其实思路也是一样, 只是反过来处理
// 还是定义处理队列, 只是队列输出的地方变成客户端的 Socket 句柄
SourceQueueWithComplete<ByteString> outbound = Source.<ByteString>queue(
1024, // 队列长度
OverflowStrategy.backpressure() // 队列满了之后的策略, 背压策略会回滚最早消息覆盖
).via(Framing.lengthField( // 这里采用 Framing.lengthField 方式
Integer.BYTES, // sizeof(int32) 长度作为数据长度单位即可, 一般等于4
0, // 长度字段的偏移, 一般长度都是放置于首位
256 * 1024, // 数据帧数量最大值
ByteOrder.BIG_ENDIAN // 网络序列是大端序列, 注意: 不声明默认采用小端序列, 会导致数据转化失败
)).to(Sink.foreach(message -> {
// 捕获数据流转, 这里就可以考虑转发给特定 Actor
// 比如 getSelf().tell(new FrameReceived(frame), getSelf());
logger.info("Frame message: {}", message);

})).run(materializer); // 挂载运行时

// 最后直接推入数据队列即可
String msgBody = "Hello World!Frame!";
byte[] msgBodyBytes = msgBody.getBytes(StandardCharsets.UTF_8);
int msgId = 10001;

// 构建数据二进制
// 构建二进制帧:[总长度(4字节)] + [msgId(4字节)] + [消息体]
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + msgBodyBytes.length);
buffer.putInt(msgBodyBytes.length + Integer.BYTES); // 长度字段只需要排除首位 int32 内容长度字段, 也就是 length = id + body 长度即可
buffer.putInt(msgId);
buffer.put(msgBodyBytes);
ByteString bytes = ByteString.fromArray(buffer.array());

// 最后投敌过去
outbound.offer(bytes).whenComplete((res, throwable) -> {
if (Objects.nonNull(throwable)) {
logger.error("push queue error", throwable);
} else {
logger.info("push queue success");
}
});


// 出站数据处理 END -------------------------------------------------------------------------------


// 最后执行, 确认队列执行被消费, 一般退出的时候需要提醒快点消费
inbound.complete();
outbound.complete();
System.out.println("任意按键即可退出程序\n");
int ignore = System.in.read();
system.terminate();
}
}

对于构建的会话对象, 一般最好采用双队列处理, 对于效率能够提升不少.