Pekko 消息交互 Actor

Pekko 消息交互

其他文章已经揭示过 Pekko 目前主流支持的网络流消息交互:

基本上需要的网络流处理, Pekko 都封装完成了, 所以也就不需要依赖第三方来处理这些.

TCP/UDP 文档我看目前没有跟进到最新强类型版本处理, 这里主要还是以 TCP 流处理为主

TCP/UDP 之类依赖只需要基础的 actor 即可, 无需其他多余依赖, 目前官网 TCP 样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.net.InetSocketAddress;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.io.Tcp;
import org.apache.pekko.io.Tcp.Bound;
import org.apache.pekko.io.Tcp.CommandFailed;
import org.apache.pekko.io.Tcp.Connected;
import org.apache.pekko.io.Tcp.ConnectionClosed;
import org.apache.pekko.io.Tcp.Received;
import org.apache.pekko.io.TcpMessage;
import org.apache.pekko.util.ByteString;

static class Client extends AbstractActor {

final InetSocketAddress remote;
final ActorRef listener;

public static Props props(InetSocketAddress remote, ActorRef listener) {
return Props.create(Client.class, remote, listener);
}

public Client(InetSocketAddress remote, ActorRef listener) {
this.remote = remote;
this.listener = listener;

final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
tcp.tell(TcpMessage.connect(remote), getSelf());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(
CommandFailed.class,
msg -> {
listener.tell("failed", getSelf());
getContext().stop(getSelf());
})
.match(
Connected.class,
msg -> {
listener.tell(msg, getSelf());
getSender().tell(TcpMessage.register(getSelf()), getSelf());
getContext().become(connected(getSender()));
})
.build();
}

private Receive connected(final ActorRef connection) {
return receiveBuilder()
.match(
ByteString.class,
msg -> {
connection.tell(TcpMessage.write((ByteString) msg), getSelf());
})
.match(
CommandFailed.class,
msg -> {
// OS kernel socket buffer was full
})
.match(
Received.class,
msg -> {
listener.tell(msg.data(), getSelf());
})
.matchEquals(
"close",
msg -> {
connection.tell(TcpMessage.close(), getSelf());
})
.match(
ConnectionClosed.class,
msg -> {
getContext().stop(getSelf());
})
.build();
}
}

之后启动获取地址的方式如下:

1
final ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();

目前强类型 typed 还没有对应文档说明怎么用, 还是推荐采用经典的 actor 声明, 这里面就涉及到转化功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 注意: 目前基本只需要强类型下转弱类型, 这个过程是不可逆的, 也就是只允许强转弱
// 下面就是核心的处理转化方式

/**
* 将强类型 ActorRef 下转为弱类型引用
*/
public <T> org.apache.pekko.actor.ActorRef convertActorRef(ActorRef<T> actorRef) {
return Adapter.toClassic(actorRef);
}

/**
* 将强类型 ActorContext 下转为弱类型引用
*/
public <T> org.apache.pekko.actor.ActorContext convertActorContext(ActorContext<T> actorContext) {
return Adapter.toClassic(actorContext);
}

/**
* 将强类型 ActorSystem 下转为弱类型引用
*/
public <T> org.apache.pekko.actor.ActorSystem convertActorSystem(ActorSystem<T> actorSystem) {
return Adapter.toClassic(actorSystem);
}

/**
* 将强类型 Scheduler 下转为弱类型引用
*/
public org.apache.pekko.actor.Scheduler convertActorScheduler(Scheduler scheduler) {
return Adapter.toClassic(scheduler);
}

这样 Pekko 就可以回滚成经典的 Actor 接口来处理对应所有功能, 但是需要注意以下问题:

  • 仅用于创建经典 Actor(如桥接器), 不要用 Classic Context 处理 Typed 消息, 只要 ActorRef 推送避免使用到 ActorContext

  • 避免在 Typed Actor 中过度使用 Classic Context, 否则会丧失 Typed 的类型安全优势

  • 禁止直接向转化后的 Classic ActorRef 发送原生 IO 消息, 也就是发送不是原本 Typed 声明类型的消息

  • 有些老模板只有经典的 Actor 实现, 如果强类型声明太过复杂的情况可以直接改由经典模式声明 Actor

所以这样处理时候的 TCP 服务端只需要按照下面这样编写即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.*;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.io.Tcp;
import org.apache.pekko.io.TcpMessage;
import org.apache.pekko.util.ByteString;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Objects;

/**
* TCP Actor 服务
*/
public class PekkoTcpExample {

/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {
// 创建初始化默认的 ActorSystem Boot
final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "tcp-example");

// 经典版本初始化 Actor
// 不能直接采用 classicSystem.actorOf, 会导致创建的层级挂载于顶层 Actor 树
final ActorRef<Void> server = system.systemActorOf(Behaviors.setup(context -> {
// 这里创建个 Actor-Typed 运行层, 内部采用经典 Actor 构建, 自定提升为该层级上层, 也就是 /tcp-service 节点
org.apache.pekko.actor.ActorContext classicContext = Adapter.toClassic(context);
classicContext.actorOf(PekkoTcpActor.props(
org.apache.pekko.actor.ActorRef.noSender(),
"127.0.0.1",
18881,
128
));
return Behaviors.empty();
}), "tcp-service", Props.empty());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at tcp://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();
system.terminate();
}


/**
* Pekko TCP 服务 Actor
*/
public static class PekkoTcpActor extends AbstractActor {


/**
* TCP 服务原生的弱类型句柄
*/
final org.apache.pekko.actor.ActorRef manager;

/**
* TCP 监听的 Actor 句柄
*/
final org.apache.pekko.actor.ActorRef handler;

/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 监听地址
*/
final String hostname;

/**
* 监听端口
*/
final int port;

/**
* 监听队列值
*/
final int backlog;


/**
* 私有构建
*/
private PekkoTcpActor(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) {
this.manager = manager;
this.handler = Tcp.get(getContext().getSystem()).manager();
this.hostname = hostname;
this.port = port;
this.backlog = backlog;
}

/**
* 静态构建
*/
public static org.apache.pekko.actor.Props props(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) {
return org.apache.pekko.actor.Props.create(PekkoTcpActor.class, () -> new PekkoTcpActor(manager, hostname, port, backlog));
}

/**
* 启动初始化
*/
@Override
public void preStart() {
// 通知启动 TCP 监听服务
handler.tell(TcpMessage.bind(getSelf(), new InetSocketAddress(hostname, port), backlog), getSelf());
}

/**
* 退出处理
*/
@Override
public void postStop() {
// 通知关闭 TCP 的句柄运行
handler.tell(TcpMessage.unbind(), getSelf());
}

/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Bound.class, msg -> {
logger.info("TCP Listen:{}", msg.localAddress());
if (Objects.nonNull(manager)) {
manager.tell(msg, getSelf());
}
})
.match(Tcp.CommandFailed.class, msg -> {
logger.error("TCP Listen Failed, Address:{}:{}, already in use?", hostname, port);
getContext().stop(getSelf()); // 绑定失败直接停止 Actor
})
.match(Tcp.Connected.class, connected -> {
logger.info("New Connect, REMOTE:{} → LOCAL:{}",
connected.remoteAddress(), connected.localAddress());

if (Objects.nonNull(manager)) {
manager.tell(connected, getSelf());
}


// 动态转发构建会话, 将 Connected 会话移交给其他 Actor 处理
// 也就是当有会话连接访问过来的时候, 会默认动态创建 Actor 然后把通知 TCP 内部将其中消息处理功能转给创建的 Actor
final org.apache.pekko.actor.ActorRef handler =
getContext().actorOf(org.apache.pekko.actor.Props.create(SimplisticHandler.class));
// getSender().tell(TcpMessage.register(handler), getSelf()); // 这里不需要外部来注册 TCP 消息接管, 而是采用内部 Actor 自行处理

handler.tell(connected, getSender()); // 通知连接, getSender() 代表的是维护 TCP 服务的 Actor
}).build();
}
}

/**
* 简单的动态 TCP 会话
*/
static class SimplisticHandler extends AbstractActor {

/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Connected.class, msg -> {
logger.info("Session Connected: {}", msg.remoteAddress());

// 通知系统的 TCP Actor, 将接管目前 TCP 会话的处理
getSender().tell(TcpMessage.register(getSelf()), getSelf());
})
.match(Tcp.Received.class, msg -> {
final ByteString data = msg.data();
logger.info("Message: {}", Arrays.toString(data.toArray()));

// 回显数据给客户端
getSender().tell(TcpMessage.write(data), getSelf());
})
.match(Tcp.ConnectionClosed.class, msg -> {
logger.info("Closed, Cause: {}", msg.getErrorCause());
getContext().stop(getSelf());
})
.build();
}
}
}

这里编写了个 echo 功能的系统服务, 具体直接用以下命令即可连接:

1
2
3
nc 127.0.0.1 18881
# 有的发行版内置 netcat, 而找不到 nc 指令就用以下命令
netcat 127.0.0.1 18881

客户端连接到服务端之后每个连接都会被动态创建 Actor 维护, 可以方便做些业务逻辑, 比如挂载自己编写鉴权模块等处理.

这里就是很标准的动态 Actor 托管设计, 建议如果要设计自己的 Actor 管理器最好学习理解

理解和设计

如果是设计对外暴露的网络消息传输功能, 那么不可避免的需要支持多个客户端同时连接到服务端并且保证非阻塞且线程安全.

如果将所有会话读写操作集中在单个 Actor 之中运行, 会导致完全无法发挥服务器的 CPU 性能.

单个 Actor 的执行流程是单线程运行的, 多个 Actor 才会运行多个 CPU 核心之中, 这也是 Actor 天然线程安全的关键

这里还需要结合流处理设计读写双队列就可以做到消息分帧操作, 结合起来就可以手写出不错的 Actor 服务端功能

需要重新整合出 SimplisticHandler 类来追加流处理功能(注意: 这里要用到 pekko-stream 相关)

双队列(inbound + outbound)设计可以参照关于 PekkoStream 相关文章, 很好实现的消息队列功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.*;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.io.Tcp;
import org.apache.pekko.io.TcpMessage;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.util.ByteString;
import org.apache.pekko.util.ByteStringBuilder;

import java.net.InetSocketAddress;
import java.util.Objects;

/**
* TCP Actor 服务
*/
public class PekkoTcpExample {

/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {
// 创建初始化默认的 ActorSystem Boot
final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "tcp-example");

// 经典版本初始化 Actor
// 不能直接采用 classicSystem.actorOf, 会导致创建的层级挂载于顶层 Actor 树
final ActorRef<Void> server = system.systemActorOf(Behaviors.setup(context -> {
// 这里创建个 Actor-Typed 运行层, 内部采用经典 Actor 构建, 自定提升为该层级上层, 也就是 /tcp-service 节点
org.apache.pekko.actor.ActorContext classicContext = Adapter.toClassic(context);
classicContext.actorOf(PekkoTcpActor.props(
org.apache.pekko.actor.ActorRef.noSender(),
"127.0.0.1",
18881,
128
));
return Behaviors.empty();
}), "tcp-service", Props.empty());

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.println("Server online at tcp://localhost:18880/\nPress RETURN to stop...");
int ignore = System.in.read();
system.terminate();
}


/**
* Pekko TCP 服务 Actor
*/
public static class PekkoTcpActor extends AbstractActor {


/**
* TCP 服务原生的弱类型句柄
*/
final org.apache.pekko.actor.ActorRef manager;

/**
* TCP 监听的 Actor 句柄
*/
final org.apache.pekko.actor.ActorRef handler;

/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 监听地址
*/
final String hostname;

/**
* 监听端口
*/
final int port;

/**
* 监听队列值
*/
final int backlog;


/**
* 私有构建
*/
private PekkoTcpActor(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) {
this.manager = manager;
this.handler = Tcp.get(getContext().getSystem()).manager();
this.hostname = hostname;
this.port = port;
this.backlog = backlog;
}

/**
* 静态构建
*/
public static org.apache.pekko.actor.Props props(org.apache.pekko.actor.ActorRef manager, String hostname, int port, int backlog) {
return org.apache.pekko.actor.Props.create(PekkoTcpActor.class, () -> new PekkoTcpActor(manager, hostname, port, backlog));
}

/**
* 启动初始化
*/
@Override
public void preStart() {
// 通知启动 TCP 监听服务
handler.tell(TcpMessage.bind(getSelf(), new InetSocketAddress(hostname, port), backlog), getSelf());
}

/**
* 退出处理
*/
@Override
public void postStop() {
// 通知关闭 TCP 的句柄运行
handler.tell(TcpMessage.unbind(), getSelf());
}

/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Bound.class, msg -> {
logger.info("TCP Listen:{}", msg.localAddress());
if (Objects.nonNull(manager)) {
manager.tell(msg, getSelf());
}
})
.match(Tcp.CommandFailed.class, msg -> {
logger.error("TCP Listen Failed, Address:{}:{}, already in use?", hostname, port);
getContext().stop(getSelf());
})
.match(Tcp.Connected.class, connected -> {
logger.info("New Connect, REMOTE:{} → LOCAL:{}",
connected.remoteAddress(), connected.localAddress());

if (Objects.nonNull(manager)) {
manager.tell(connected, getSelf());
}


// 动态转发构建会话, 将 Connected 会话移交给其他 Actor 处理
// 也就是当有会话连接访问过来的时候, 会默认动态创建 Actor 然后把通知 TCP 内部将其中消息处理功能转给创建的 Actor
final org.apache.pekko.actor.ActorRef handler =
getContext().actorOf(org.apache.pekko.actor.Props.create(SimplisticHandler.class));
// getSender().tell(TcpMessage.register(handler), getSelf()); // 这里不需要外部来注册 TCP 消息接管, 而是采用内部 Actor 自行处理
handler.tell(connected, getSender()); // 通知连接, getSender() 代表的是维护 TCP 服务的 Actor
}).build();
}
}

/**
* 简单的动态 TCP 会话
*/
static class SimplisticHandler extends AbstractActor {

/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 流的运行时
*/
final Materializer materializer = Materializer.createMaterializer(getContext().getSystem());

/**
* 系统处理 TCP 会话的 Actor
*/
private org.apache.pekko.actor.ActorRef session;


/**
* 分帧出来的消息结构
*
* @param message
*/
record FrameMessage(ByteString message) {
}


/**
* 入站队列, 这里的参数先写死, 正确应是支持外部配置
* 使用 OverflowStrategy.backpressure()(背压策略), 队列满时不会直接丢弃数据, 而是向上游施加压力来保证数据完整性
*/
final SourceQueueWithComplete<ByteString> inbound = Source.<ByteString>queue(
1024, // 队列长度
OverflowStrategy.backpressure() // 队列满了之后的策略
).via(Framing.delimiter( // 这里也可以采用 Framing.lengthField 方式
ByteString.fromString("\n"),// 数据分隔符
1024, // 最大缓冲区
FramingTruncation.ALLOW // 指定当输入数据流被截断无法处理时候策略: DISALLOW - 中断处理/ ALLOW - 直接丢弃
)).to(Sink.foreach(message -> {
// 捕获数据流转, 这里就可以考虑转发给特定 Actor
// 比如 getSelf().tell(new FrameReceived(frame), getSelf());
logger.info("Message Request: {}", message);

// 这里的数据直接切分给自己 Actor 推送消息, 让其自行处理监听
getSelf().tell(new FrameMessage(message), getSender());
})).run(materializer);

/**
* 出站队列, 这里不需要传入的数据过滤分帧, 而是获取到数据自行附加上结构
*/
final SourceQueueWithComplete<ByteString> outbound = Source.<ByteString>queue(
1024, // 队列长度
OverflowStrategy.backpressure() // 队列满了之后的策略, 背压策略会回滚最早消息覆盖
).to(Sink.foreach(message -> {

// 这里可以处理下消息, 比如追加字符串
ByteStringBuilder builder = ByteString.createBuilder();
builder.append(ByteString.fromString("Pekko Say: "));
builder.append(message);
builder.append(ByteString.fromString("\n"));
ByteString newMessage = builder.result();

logger.info("Message Request: {}", newMessage);

// 推送给对应的客户端
session.tell(TcpMessage.write(newMessage), getSelf());
})).run(materializer);


/**
* 消息拦截
*/
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Tcp.Connected.class, msg -> {
logger.info("Session Connected: {}", msg.remoteAddress());
this.session = getSender();

// 通知系统的 TCP Actor, 将接管目前 TCP 会话的处理
session.tell(TcpMessage.register(getSelf()), getSelf()); // 这里的注册其实也可以推送
})

.match(Tcp.Received.class, msg -> {
final ByteString data = msg.data();

// 写入入站消息队列
inbound.offer(data).whenComplete((result, throwable) -> {
if (Objects.nonNull(throwable)) {
logger.warning(throwable, "消息队列异常");
} else {
if (QueueOfferResult.enqueued().equals(result)) {
logger.debug("消息入队成功:{}字节", data.length());
} else if (QueueOfferResult.dropped().equals(result)) {
logger.warning("队列满,消息被丢弃:{}字节", data.length());
} else if (QueueOfferResult.closed().equals(result)) {
logger.warning("队列已关闭,消息入队失败:{}字节", data.length());
}
}
});

})
.match(FrameMessage.class, frameMessage -> {
ByteString message = frameMessage.message();
logger.info("Request Frame Message = {}", message.utf8String());


// 这里就是可以直接出站队列推送下消息, 比如 echo 返回
outbound.offer(frameMessage.message()).whenComplete((result, throwable) -> {
if (Objects.nonNull(throwable)) {
logger.warning(throwable, "消息队列异常");
} else {
if (QueueOfferResult.enqueued().equals(result)) {
logger.debug("消息入队成功:{}字节", message.length());
} else if (QueueOfferResult.dropped().equals(result)) {
logger.warning("队列满,消息被丢弃:{}字节", message.length());
} else if (QueueOfferResult.closed().equals(result)) {
logger.warning("队列已关闭,消息入队失败:{}字节", message.length());
}
}
});
})

.match(Tcp.ConnectionClosed.class, msg -> {
logger.info("Closed, Cause: {}", msg.getErrorCause());
// 关闭队列,释放资源
inbound.complete();
outbound.complete();
getContext().stop(getSelf());
})
.build();
}
}
}

这就是很简易的双队列消息推送机制, 抛弃了入站数据即出站数据的思路来支持异步队列操作, 外部要发消息通知会话 Actor 来发送.

后续可以扩展出来自行封装成需要的通用网络组件, 因为网络库基本上都是常用功能, 封装起来方便后续复用

实践过程当中发现很多 TypedActor 功能其实还没有实现, 只能转换成经典的 Actor 设计, 后来感觉经典版本 Actor 兼容性和支持度更好点

WebSocket 流

对于 TCP/UDP 做调试测试的时候很麻烦, 想测试消息推送的时候没办法做到客户端直接支持 Framing 分帧消息发送处理

所以接下来需要引入 WebSocket 网络做数据流传输, 相比于 TCP/UDP 客户端, WebSocket 客户端支持广泛(支持二进制转Base64传输)

参考网址: https://pekko.apache.org/docs/pekko/current/stream/stream-refs.html

直接搭建 WebSocket 框架测试样例即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.http.javadsl.ServerBinding;
import org.apache.pekko.http.javadsl.model.AttributeKeys;
import org.apache.pekko.http.javadsl.model.HttpRequest;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.http.javadsl.model.ws.BinaryMessage;
import org.apache.pekko.http.javadsl.model.ws.Message;
import org.apache.pekko.http.javadsl.model.ws.TextMessage;
import org.apache.pekko.http.javadsl.server.AllDirectives;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.CompletionStrategy;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
import org.apache.pekko.util.ByteStringBuilder;

import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;


/**
* Pekko WebSocket 数据帧实现
*/
public class PekkoFramedExample extends AllDirectives {


/**
* 服务请求入口
*/
public static void main(String[] args) throws Exception {
String hostname = "127.0.0.1";
int port = 18889;
String path = "/test";

// 创建初始化默认的 ActorSystem Boot
// 这里采用经典 Actor 而非强类型 Actor
ActorSystem system = ActorSystem.create("websockets");


// 启动 WebSocket 数据服务
ActorRef actorRef = system.actorOf(PekkoWebSocketActor.create(
hostname,
port,
path
));

// 这里采用任意输入回车中断监听, 如果未做任何键盘操作则是保持运行
System.out.printf("Server online at ws://%s:%d%s%n", hostname, port, path);
System.out.println("Press RETURN to stop...");
int ignore = System.in.read();

system.terminate();
}


/**
* 客户端连接完成
*/
public record Connected(
ActorRef outbound
) {

}


/**
* WebSocket 管理器
*/
public static class PekkoWebSocketActor extends AbstractActor {


/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 监听地址
*/
final String hostname;

/**
* 监听端口
*/
final int port;

/**
* 监听路径
*/
final String path;


/**
* HTTP 句柄
*/
final Http http = Http.get(getContext().getSystem());

/**
* 物料对象
*/
final Materializer materializer = Materializer.createMaterializer(getContext().getSystem());

/**
* 绑定 HTTP 句柄
*/
private CompletionStage<ServerBinding> binding;


/**
* 私有构建
*/
private PekkoWebSocketActor(String hostname, int port, String path) {
this.hostname = hostname;
this.port = port;
this.path = path;
}

/**
* 静态构建
*/
public static Props create(String hostname, int port, String path) {
return Props.create(PekkoWebSocketActor.class, () -> new PekkoWebSocketActor(
hostname, port, path
));
}

/**
* 启动初始化
*/
@Override
public void preStart() {
logger.info("WebSocket Started: {}:{}{}", hostname, port, path);
this.binding = http
.newServerAt(hostname, port)
.bind(this::createRoute) // 构建请求检测
.whenComplete((res, throwable) -> {
// 检测异常, 如果异常立即退出 Actor
if (Objects.nonNull(throwable)) {
logger.error(throwable, "异常的 WebSocket");
getContext().stop(getSelf());
} else {
logger.info("启动服务成功");
}
});
}

/**
* 匹配路由访问和提升 WebSocket 请求
*/
private CompletionStage<HttpResponse> createRoute(HttpRequest request) {
// 匹配路由直接提升为 WebSocket 请求, 否则返回 404
if (request.getUri().path().equals(path)) {
return CompletableFuture.completedFuture(request
.getAttribute(AttributeKeys.webSocketUpgrade)
.map(upgrade -> upgrade.handleMessagesWith(createFlow()))
.orElse(HttpResponse.create()
.withStatus(StatusCodes.BAD_REQUEST)
.withEntity("Expected WebSocket request")
));
} else {
return CompletableFuture.completedFuture(HttpResponse
.create()
.withStatus(StatusCodes.NOT_FOUND));
}
}


/**
* 构建消息流管道
* 注意: 这里当会话连接之后就会调用, 也就是可以在这里动态构建 Actor
*/
private Flow<Message, Message, NotUsed> createFlow() {
// 动态创建 Actor
String name = UUID.randomUUID().toString();
ActorRef actorRef = getContext().actorOf(Props.create(
PekkoWebSocketSession.class,
PekkoWebSocketSession::new
), name);
logger.info("创建新 Actor: {}", actorRef.path());


// 父级 Actor 不用管太多消息交换, 仅仅作为服务状态维护
// 只提取 Sink(客户端上报数据), Source 只需要获取队列句柄移交给子 Actor 而不需要同步返回

// Source.actorRef 会基于 run 运行时在当前 Actor 启动个 Source 监听流
// 参考地址: https://pekko.apache.org/docs/pekko/current/stream/operators/Source/actorRef.html
Pair<ActorRef, Source<Message, NotUsed>> source = Source.<Message>actorRef(elem -> {
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
} else {
return Optional.empty();
}
}, elem -> Optional.empty(),
1024, // 队列长度
OverflowStrategy.dropHead() // 策略: 溢出时丢弃最旧的消息
).preMaterialize(materializer);
actorRef.tell(new Connected(source.first()), getSelf());


// 构建 sink
Sink<Message, CompletionStage<Done>> sinkBuilder = Sink.foreach(message -> actorRef.tell(message, getSelf()));
Sink<Message, NotUsed> sink = sinkBuilder.mapMaterializedValue(future -> {
future.whenComplete((done, throwable) -> {
this.createQuitComplete(done, throwable, actorRef);
getContext().stop(source.first()); // 关闭消息通道 Actor
});
return NotUsed.getInstance();
});


// 构建消息流通道
return Flow.fromSinkAndSource(sink, source.second())
// 监听 WebSocket 的会话状态
.watchTermination((ignore, future) -> {
future.whenComplete((done, throwable) -> {
this.createQuitComplete(done, throwable, actorRef);
getContext().stop(source.first()); // 关闭消息通道 Actor
});
return NotUsed.getInstance();
});
}

/**
* 构建退出的回调
*/
private void createQuitComplete(Done ignore, Throwable throwable, ActorRef actorRef) {
if (Objects.nonNull(throwable)) {
// 异常断开(如网络中断、超时)
logger.error(throwable, "WebSocket 异常断开:{}", actorRef.path());
} else {
// 正常断开(客户端主动关闭、服务端主动关闭)
logger.info("WebSocket 正常断开:{}", actorRef.path());
}

// 断开后停止 Actor 让其清理资源
getContext().stop(actorRef);
}

/**
* Actor 退出回调
*/
@Override
public void postStop() {
if (Objects.nonNull(binding)) {
binding.thenCompose(ServerBinding::unbind)
.thenAccept((done) -> logger.info("解除绑定 WebSocket 成功"));
}
}


/**
* 消息拦截
*/
@Override
public Receive createReceive() {
// 可以自定义做些操作
return receiveBuilder()
.build();
}
}


/**
* WebSocket 动态构建的会话对象
*/
public static class PekkoWebSocketSession extends AbstractActor {

/**
* 日志句柄
*/
final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


/**
* 出站消息队列
*/
ActorRef outbound;


// todo: 如果需要可以追加出站和入站的消息通道

@Override
public void postStop() {
logger.info("Actor[{}] 已退出", getSelf().path());
}

@Override
public Receive createReceive() {
return receiveBuilder()
// 连接完成这里只需要处理入站消息队列即可, 外部消息队列已经传入
.match(Connected.class, command -> this.outbound = command.outbound)
.match(BinaryMessage.class, message -> {
logger.info("binary message = {}", message.getStrictData());


// 以二进制合并返回响应数据
ByteStringBuilder builder = ByteString.createBuilder();
builder.append(ByteString.fromString("Pekko Say: "));
builder.append(message.getStrictData());
outbound.tell(BinaryMessage.create(builder.result()), getSelf());
})
.match(TextMessage.class, message -> {
logger.info("text message = {}", message.getStrictText());

// 以字符串合并返回响应数据
outbound.tell(TextMessage.create("Pekko Say: %s".formatted(message)), getSelf());
})
.build();
}
}
}

这里就是直接做基础读写流操作, 目前没有引入 Framing 功能来做数据分帧

不过因为 WebSocket 本身是高级协议不好动底层, 所有都是有上级 WebSocket Actor 做管道转发过来, 数据量上来的时候可能有瓶颈.

这里最后扩展是在会话的 Actor 追加 inbound 入站的队列, 然后外部数据尽可能快填充到队列, 而不是数据到来直接运行业务逻辑.

避免业务和分帧处理在同一个运行时处理, 后面的会话 Actor 自行读取队列数据来做业务功能处理

强类型 TCP 服务

在查询官方文档之后, 确认其实强类型 TCP/UDP 是支持挂载的, 这边也测试下看看是否能够运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import org.apache.pekko.actor.typed.*;
import org.apache.pekko.actor.typed.javadsl.AbstractBehavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.actor.typed.javadsl.Receive;
import org.apache.pekko.event.Logging;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.io.Tcp;
import org.apache.pekko.io.TcpExt;
import org.apache.pekko.io.TcpMessage;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
* PekkoTCPTyped 服务
*/
public class PekkoTcpTypedExample {
/**
* 服务入口
*
* @param args 参数列表
*/
public static void main(String[] args) throws IOException {

// 创建空的 ActorSystem, 后面任务手动附加
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "pekko-typed-stream");
ActorRef<Tcp.Message> handler = system.systemActorOf(TcpActorStream.create("127.0.0.1", 18889), "tcp-service", Props.empty());

system.getWhenTerminated().thenAccept(done -> {
handler.unsafeUpcast().tell(PostStop.instance());
});

// 等待随便输入退出
System.out.println("Press any key to stop...");
int ignore = System.in.read();
system.terminate();
}


/**
* 强类型 TCP 服务
* <p>
* 具体的监听命令可以查看包定义: /org/apache/pekko/pekko-actor_2.13/1.1.5/pekko-actor_2.13-1.1.5.jar!/org/apache/pekko/io/Tcp.class
*/
public static class TcpActorStream extends AbstractBehavior<Tcp.Message> {

/**
* 获取 TCP 监听句柄
*/
final TcpExt handler = Tcp.get(getContext().getSystem());

/**
* 日志对象
*/
final LoggingAdapter log = Logging.getLogger(getContext().getSystem().classicSystem(), this);

/**
* 监听地址
*/
final String hostname;

/**
* 监听端口
*/
final int port;

/**
* 构造方法
*/
private TcpActorStream(ActorContext<Tcp.Message> context, String hostname, int port) {
super(context);
this.hostname = hostname;
this.port = port;
handler.getManager().tell(TcpMessage.bind(
getContext().classicActorContext().self(),
new InetSocketAddress(hostname, port),
100
), getContext().classicActorContext().self());
}


/**
* 静态构造
*/
public static Behavior<Tcp.Message> create(String hostname, int port) {
return Behaviors
.setup(context -> new TcpActorStream(context, hostname, port));
}

@Override
public Receive<Tcp.Message> createReceive() {
return newReceiveBuilder()
// 退出信号
.onSignal(PostStop.class, (command) -> {
log.warning("Actor stopped");
handler.getManager().tell(TcpMessage.unbind(), getContext().classicActorContext().self());
return Behaviors.stopped();
})

.onMessage(Tcp.Bound.class, (command) -> {
log.info("Tcp bound {}", command.localAddress());
return Behaviors.same();
})

.onMessage(Tcp.Connected.class, (command) -> {
log.info("Connecting to {}", command.remoteAddress());

// 注册监听者, 这里就是 spawn 动态创建会话移交监听的地方
// 核心就是 TcpMessage.register 注册负责处理的 Actor
getContext().classicActorContext().sender().tell(
TcpMessage.register(getContext().classicActorContext().self()),
getContext().classicActorContext().self()
);

// 这里简单处理, 让单个 Actor 监听即可
return Behaviors.same();
})


.onMessage(Tcp.CommandFailed.class, (command) -> {
log.info("Connection failed by {}", command);
return Behaviors.same();
})

.onMessage(Tcp.Received.class, (command) -> {
log.info("Received {}", command);
return Behaviors.same();
})


.onAnyMessage((event) -> {
log.info("Other event: {}", event);
return Behaviors.same();
})

.build();
}
}
}