JavaPekkoPekko Stream 使用
MeteorCatPekko Stream 是一套异步、非阻塞、支持背压(Backpressure)的数据流处理框架,
主要用来解决大数据/高并发场景下的资源管控问题, 避免数据加载时候的 OOM 问题.
pekko 需要依赖以下的组件(采用 Maven 来包管理):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-stream-testkit_${scala.binary.version}</artifactId> <version>${pekko.version}</version> <scope>test</scope> </dependency>
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor-typed_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-slf4j_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.9</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.9</version> <scope>runtime</scope> </dependency> </dependencies>
|
Stream 把所有数据处理抽象为以下基础构件:
| 构件 |
底层职责 |
| Source |
生产数据(从文件、网络、Actor、队列等读取),管理数据生成的异步线程/资源 |
| Sink |
消费数据(写入文件、发送网络请求、投递到 Actor、打印日志),处理消费后的资源释放 |
| Flow |
中转处理数据(过滤、转换、分帧、加解密),串联 Source 和 Sink 形成完整管道 |
比较典型的 PekkoStream 应用场景如下:
-
WebSocket 长连接:客户端和服务端的双向数据流动(Source=服务端推消息,Sink=接收客户端消息)
-
HTTP 流式响应:比如下载大文件(Source 读取文件块,流式返回给客户端,避免一次性加载到内存)
-
TCP/UDP 服务:Pekko TCP 底层基于 Stream 实现,处理高并发的 TCP 连接(比如游戏服务端的长连接)
-
文件处理:Pekko 支持集成内部文件处理包装成异步, 可以作为日志分析、数据清洗、格式转换, 处理 GB 级文件时避免 OOM
| 应用场景 |
核心优势 |
核心API/工具 |
| WebSocket 长连接 |
双向背压、连接隔离、Actor整合 |
Flow.fromSinkAndSource |
| HTTP 流式响应 |
内存可控、异步IO、断点续传 |
FileIO.fromPath、HttpEntity |
| TCP 服务 |
高并发、灵活解包、连接管控 |
Tcp.bind、Framing |
| 文件处理 |
异步读写、流式转换、结果聚合 |
FileIO、Source.fromInputStream、Framing.delimiter |
在大数据的情境下, 采用 Stream 处理效率极其优异, 能够做到资源占用和运行效率的平衡.
注意: Pekko Stream 和 Actor 是 "互补关系", Stream 负责批量/流式数据处理, Actor 负责状态管理/业务逻辑
这部分需要比较强的设计功底, 需要了解底层的处理方式才能结合起来使用, 所以必须深入理解 Pekko 的 Stream 概念.
简易流处理
这里模拟将 System.out/System.in 包装成 Source(System.out)/Sink(System.in) 来加深理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| import org.apache.pekko.Done; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.*; import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.util.ByteString;
import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.CompletionStage;
public class PekkoStreamExample {
public static void main(String[] args) {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "SystemIO");
Materializer materializer = Materializer.createMaterializer(system);
InputStream inputStream = System.in;
Source<ByteString, ?> inputSource = StreamConverters.fromInputStream( () -> inputStream, 8192 ) .via(Framing.delimiter( ByteString.fromString("\n"), 1024, FramingTruncation.ALLOW )) .withAttributes(Attributes.inputBuffer(1, 1));
Flow<String, String, ?> processFlow = Flow.of(String.class) .filter(line -> !line.trim().isEmpty()) .map((line) -> { return "Robot: %s".formatted(line); });
SharedKillSwitch killSwitch = KillSwitches.shared("quit-signal");
Sink<String, CompletionStage<Done>> outputSink = Sink.foreach( line -> { if ("Robot: quit".equalsIgnoreCase(line)) { killSwitch.shutdown(); return; } System.out.println(line); } );
System.out.println("数据回显程序已启动"); System.out.println("输入 quit 即可退出程序\n");
CompletionStage<Done> done = inputSource .map(byteString -> byteString.decodeString(StandardCharsets.UTF_8)) .via(killSwitch.flow()) .via(processFlow) .toMat(outputSink, Keep.right()) .run(materializer);
done.whenComplete((result, exception) -> { if (Objects.nonNull(exception)) { System.err.println("数据流异常终止: " + exception.getMessage()); } else { System.out.println("数据流正常结束"); } system.terminate(); }); } }
|
这里的 map 是 pekko 自己实现类似 Java 系统库的 Stream 处理, 需要认识以下关键功能类:
-
Materializer: 流的抽象运行时, 必须要获取到该运行时才能将流处理任务推送到 ActorSystem 执行
-
CompletionStage: 异步任务结果的封装, 用于监听流执行最后的结果响应, 常用 Done(正常完成)/Exception(异常)获取自定义类型
-
SharedKillSwitch: 流的主动终止信号, 用来主动处理流生命周期, 但是流处理中断操作必须谨慎
物化值(Materialized Value) 是流处理的关键, 必须了解才能引出流交换过程暴露的资源/控制句柄, 也就是拦截获取流任务处理任务
比较通俗可以比喻 Pekko Stream 的拓扑 Source(输出) → Flow(管道) → Sink(输入) 当作现实的流水生产线:
-
产出物品: 生产线上被加工的原材料(比如字符串、二进制数据等)
-
物化值: 启动生产线后, 拿到的控制工具/资源句柄(比如电源开关、原材料仓库钥匙、生产进度看板)
-
在这个过程之中, 就可以针对物化值自主获取到需要的 Source(输出材料)/Sink(输入材料)/NotUsed(都不需要)/Both(全都需要)
然后就是关于 Keep.none/both/right/left, 这就是最后调控流执行的时候获取相关物料:
| 常量 |
作用 |
适用场景 |
调用后返回类型 |
Keep.none() |
不保留任何物化值 |
不需要监听流状态、不需要控制流 |
NotUsed |
Keep.left() |
保留左侧构件的物化值 |
需获取 Source 的资源句柄(如 TCP 端口) |
Source 的物化值类型 |
Keep.right() |
保留右侧构件的物化值 |
需获取 Sink 的结果(如 CompletionStage<Done>) |
Sink 的物化值类型 |
Keep.both() |
保留左右两侧的物化值 |
需同时控制 Source 和 Sink(如监听结果+持有连接) |
Pair<左物化值, 右物化值> |
Keep.* 相关值需要配合 XXX.toMat(xxx,{物料}) 来调用, 而 XXX.to(xxx) 等价于 XXX.toMat(xxx, Keep.none()),返回 NotUsed
一般来说运行 Flow 流过程之中需要指定 Materializer 来运转, 这需要 Keep.* 来提取并且指定 Sink/Source 运行时来执行流
构建流都需要传入 *.run(Materializer) 来做运行时执行处理, 测试例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "demo"); Materializer materializer = Materializer.createMaterializer(system);
Source<String, ?> source = Source.single("hello pekko stream"); Sink<String, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
CompletionStage<Done> sinkMat = source.toMat(sink, Keep.right()).run(materializer);
Object sourceMat = source.toMat(sink, Keep.left()).run(materializer);
var bothMat = source.toMat(sink, Keep.both()).run(materializer);
sinkMat.whenComplete((done, ex) -> { System.out.println("流执行完成"); system.terminate(); }); }
|
另外 Flow 就是关键的运行核心, 这里标识下这个类型的参数签名 Flow<输入类型:In, 输出类型:Out, 物料:Mat>
之后就是引入这个极具设计感的 图(Graph) 核心设计
Graph 设计
Graph 是数学当中的模型, Pekko 的流处理借用这种概念, 涉及到数据结构和算法相关, 如果只做开发应用可以略过这章节
Graph 是 Pekko 核心设计(借用数学分支), 所有的 Source、Flow、Sink, 以及它们组合成的复杂数据流拓扑, 本质上都是 Graph 具体实现.
首先查看下 Flow<输入类型:In, 输出类型:Out, 物料:Mat> 内部源码定义:
1 2 3 4 5 6 7 8 9
|
public final class Flow<In, Out, Mat> implements Graph<FlowShape<In, Out>, Mat> { }
|
Graph 在数学当中主要用于描述离散对象之间关系的抽象数据结构, 其核心作用是将复杂的 对象 - 关系 场景建模成以下元素:
-
节点(Vertex/Node)
-
边(Edge)
-
通过数学方法分析多对多关联从而解决问题
这部分抽象出来的定义的场景应用方向如下:
-
网络拓扑 - 计算机网络中的主机(节点)和网线(边), 社交网络中的用户(节点)和好友关系(边)
-
路径规划 - 城市(节点)和公路(边)、地图中的路口(节点)和道路(边)
-
依赖关系 - 项目中的任务(节点)和任务间的先后依赖(边)、代码中的函数(节点)和调用关系(边)
如果没有图结构, 这类 多对多 关系很难高效表达, 所以这种网状交叉模型就被引入到计算机科学当中.
Graph 最著名的应用场景就是判断两个节点(Vertex/Node)之间是否可达? 可达的最短路径是哪条?
而 Graph 的分类维度可以按照以下分类:
-
按边的方向性划分
- 无向图 - 边没有方向, 节点之间的关系是双向的(社交网络的 “好友关系”, 城市之中的 “道路连通”)
- 有向图 - 边有方向, 节点之间的关系是单向的(代码的 “函数调用关系”, 任务的 “依赖顺序”)
-
按边的权重属性划分
- 无权图 - 边没有附加属性, 仅表示 “是否连接”
- 加权图 - 边带有权重值, 用于表示关系的强度/成本/容量
这两个维度是后续选择图算法(比如最短路径、拓扑排序)的关键依据, 边的方向性可以和权重做组合:
| 图的分类 |
核心适用算法 |
算法解决的核心问题 |
典型工程场景 |
| 无向无权图 |
BFS/DFS |
两点是否可达、最短路径(步数) |
社交网络“好友链”查找、网络连通性检测 |
| 无向加权图 |
Dijkstra、Prim(最小生成树) |
最短路径(成本)、最小连通成本 |
地图导航(无单行道)、电网布线优化 |
| 有向无权图 |
BFS/DFS、拓扑排序(Kahn) |
依赖顺序、单向可达性 |
代码编译依赖、任务调度 |
| 有向加权图 |
Dijkstra、A*、Bellman-Ford |
带成本的单向最短路径 |
地图导航(有单行道)、网络路由 |
| 有向无环图 |
拓扑排序、动态规划 |
依赖调度、路径权重累加 |
流水线任务编排、依赖包版本解析 |
回过头来说 Pekko Stream, 里面就是采用 Source → Flow → Sink 来实现流传递, 本质是工程化的有向无环图
接下来就是按照 Graph 理解这些代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
void emptyVertex() { Source<String, NotUsed> source = Source.single("hello"); source.run(materializer); }
void matchError() { Source<Integer, NotUsed> source = Source.single(123); Flow<String, String, NotUsed> flow = Flow.of(String.class).map(s -> s.toUpperCase()); source.via(flow).to(Sink.foreach(System.out::println)); }
|
这就是 Pekko 将数学当中 Graph 引入计算机当中的实践, 这里只需要了解一下就行了, 后续如果要深入的话需要更多扩展资料.
数据通道
上面常常看到频繁 Source → Flow → Sink 说明的数据流向, 但是现实当中还需要数据输入输出可能是分开处理(异步读取/写入).
而 PekkoStream 考虑到这一点, 所以也支持拆分数据到单向空节点之中, 也就是拆分:
-
inbound: 入站数据
-
outbound: 出站数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.OverflowStrategy; import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.util.ByteString; import org.slf4j.Logger;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.Objects;
public class PekkoStreamQueueExample {
public static void main(String[] args) throws IOException { final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "demo"); final Materializer materializer = Materializer.createMaterializer(system); final Logger logger = system.log();
SourceQueueWithComplete<ByteString> inbound = Source.<ByteString>queue( 1024, OverflowStrategy.backpressure() ).via(Framing.delimiter( ByteString.fromString("\n"), 1024, FramingTruncation.ALLOW )).to(Sink.foreach(message -> { logger.info("Message received: {}", message);
})).run(materializer);
inbound.offer(ByteString.fromString("Hello World! ERROR!")) .whenComplete((res, throwable) -> { if (Objects.nonNull(throwable)) { logger.error("push queue error", throwable); } else { logger.info("push queue success"); } });
inbound.offer(ByteString.fromString("Hello World!Hello!!!\n")) .whenComplete((res, throwable) -> { if (Objects.nonNull(throwable)) { logger.error("push queue error", throwable); } else { logger.info("push queue success"); } });
SourceQueueWithComplete<ByteString> outbound = Source.<ByteString>queue( 1024, OverflowStrategy.backpressure() ).via(Framing.lengthField( Integer.BYTES, 0, 256 * 1024, ByteOrder.BIG_ENDIAN )).to(Sink.foreach(message -> { logger.info("Frame message: {}", message);
})).run(materializer);
String msgBody = "Hello World!Frame!"; byte[] msgBodyBytes = msgBody.getBytes(StandardCharsets.UTF_8); int msgId = 10001;
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES + msgBodyBytes.length); buffer.putInt(msgBodyBytes.length + Integer.BYTES); buffer.putInt(msgId); buffer.put(msgBodyBytes); ByteString bytes = ByteString.fromArray(buffer.array());
outbound.offer(bytes).whenComplete((res, throwable) -> { if (Objects.nonNull(throwable)) { logger.error("push queue error", throwable); } else { logger.info("push queue success"); } });
inbound.complete(); outbound.complete(); System.out.println("任意按键即可退出程序\n"); int ignore = System.in.read(); system.terminate(); } }
|
对于构建的会话对象, 一般最好采用双队列处理, 对于效率能够提升不少.