JavaPekkoPekko Protobuf 技巧
MeteorCatPekko 是支持将数据对象作为 ProtobufV2/V3 序列化转发, 而且官方集群数据传输方案实现了这部分功能.
序列化说明: https://pekko.apache.org/docs/pekko/current/serialization.html#serialization-of-pekkos-messages
目前官方有两套 Protobuf 序列化方案支持, 比较推荐采用 pekko-protobuf-v3:
如果没有客户端需求, 一般只有集群转发会引用, 而集群处理默认已经自动引入 protobuf 支持.
而如果客户端需要依赖 Protobuf 传输数据而你的 Actor 服务目前还用不到集群功能的时候就需要手动来引入类库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| <project> <properties> <protobuf.platform.artifact-id>protobuf-bom</protobuf.platform.artifact-id> <protobuf.platform.group-id>com.google.protobuf</protobuf.platform.group-id> <protobuf.platform.version>4.32.0</protobuf.platform.version>
<protobuf.plugin.group-id>org.xolstice.maven.plugins</protobuf.plugin.group-id> <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<os.plugin.group-id>kr.motd.maven</os.plugin.group-id> <os.plugin.version>1.6.2</os.plugin.version>
<protobuf.plugin.group-id>org.xolstice.maven.plugins</protobuf.plugin.group-id> <protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<protobuf.compiler.artifact-id>protoc</protobuf.compiler.artifact-id> <protobuf.compiler.group-id>com.google.protobuf</protobuf.compiler.group-id> <protobuf.compiler.version>${protobuf.platform.version}</protobuf.compiler.version> </properties>
<dependencyManagement> <dependencies>
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.platform.version}</version> </dependency> </dependencies> </dependencyManagement>
<dependencies> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-protobuf-v3_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency>
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-slf4j_${scala.binary.version}</artifactId> <version>${pekko.version}</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.9</version> </dependency>
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.9</version> <scope>runtime</scope> </dependency> </dependencies>
<build>
<extensions> <extension> <groupId>${os.plugin.group-id}</groupId> <artifactId>os-maven-plugin</artifactId> <version>${os.plugin.version}</version> </extension> </extensions>
<plugins>
<plugin> <groupId>${protobuf.plugin.group-id}</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>${protobuf.plugin.version}</version> <extensions>true</extensions> <configuration> <protocArtifact> ${protobuf.compiler.group-id}:${protobuf.compiler.artifact-id}:${protobuf.compiler.version}:exe:${os.detected.classifier} </protocArtifact>
<clearOutputDirectory>false</clearOutputDirectory> </configuration>
<executions> <execution> <id>compile</id> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin>
</plugins> </build> </project>
|
之后生成个 Protobuf 文件来测试下是否能够正确生成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
syntax = "proto3";
option java_package = "me.meteorcat.game.protobuf";
option java_multiple_files = true;
message EchoBytesCommand{ int32 tick = 1; bytes data = 2; }
message EchoStringCommand{ int32 tick = 1; string data = 2; }
|
最后跑个测试代码运行是否正确序列化即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import me.meteorcat.game.protobuf.EchoBytesCommand; import me.meteorcat.game.protobuf.EchoStringCommand;
import java.nio.charset.StandardCharsets; import java.util.Arrays;
public class PekkoProtobufMessage {
public static void main(String[] args) { String message = "Hello.World"; byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
EchoStringCommand.Builder stringBuilder = EchoStringCommand.newBuilder(); stringBuilder.setTick(100); stringBuilder.setData(message);
EchoStringCommand stringCommand = stringBuilder.build(); byte[] stringBytes = stringCommand.toByteArray();
try { EchoStringCommand newStringCommand = EchoStringCommand.parseFrom(stringBytes);
System.out.printf("文本消息信息: %s, 结构体: {%s}%n", Arrays.toString(stringBytes), newStringCommand); } catch (InvalidProtocolBufferException e) { System.err.println(e.getMessage()); }
EchoBytesCommand.Builder bytesBuilder = EchoBytesCommand.newBuilder(); bytesBuilder.setTick(101); bytesBuilder.setData(ByteString.copyFrom(messageBytes));
EchoBytesCommand bytesCommand = bytesBuilder.build(); byte[] bytes = bytesCommand.toByteArray();
try { EchoBytesCommand newBytesCommand = EchoBytesCommand.parseFrom(bytes);
System.out.printf("二进制消息信息: %s, 结构体: {%s}%n", Arrays.toString(bytes), newBytesCommand); } catch (InvalidProtocolBufferException e) { System.err.println(e.getMessage()); }
} }
|
测试之后没什么大问题就代表成功引入 Protbuf, 接下来就是引入另外概念: 角色信息需要设置成 Protobuf 吗?
数据体 Protobuf
以前 Erlang 项目当中看到的情况, 以下就是玩家信息定义, 直接把玩家信息 record 定义成全局实体:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| -ifndef(__ERLANG_PLAYER_RECORD__). -define(__ERLANG_PLAYER_RECORD__, 1).
-record(player, { uid = 0, sid = 0, socket = ?null, pid = ?null, scene = ?null,
nickname = ?null, money = 0, lv = 1, exp = 0, attr = ?null,
net = ?null }).
-endif.
|
Erlang 的 record 相当于 Java 的 record, 也是作为数据载体负责数据管理, 在 Pekko + Protobuf 之中是可以直接声明 Proto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| syntax = "proto3"; option java_package = "me.meteorcat.game.protobuf"; option java_multiple_files = true;
message PlayerEntity { int64 uid = 1; int64 sid = 2; int64 rid = 3; string uuid = 4; int64 online = 5;
string nickname = 100; int32 money = 101; int32 currency = 102; int32 stamina = 103;
int32 level = 200; int64 experience = 201;
int64 uptime = 1000; }
|
这样设计的玩家实体就注册成 Protobuf 对象, 也就是能直接用于 Actor 传递处理(我这里还是使用经典 Actor 而非 TypedActor):

| import me.meteorcat.game.protobuf.PlayerEntity; import org.apache.pekko.actor.*; import org.apache.pekko.event.Logging; import org.apache.pekko.event.LoggingAdapter; import scala.concurrent.ExecutionContextExecutor;
import java.io.IOException; import java.time.Duration; import java.util.Objects;
public class PekkoProtobufExample {
public static void main(String[] args) throws IOException {
ActorSystem system = ActorSystem.create("pekko-protobuf");
ActorRef actorRef = system.actorOf(PekkoPlayerManager.props(), "player-manager");
System.out.println("Press RETURN to stop..."); int ignore = System.in.read(); system.terminate(); }
public static class PekkoPlayerManager extends AbstractActor {
final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
final ExecutionContextExecutor dispatcher = getContext().getDispatcher();
final Scheduler scheduler = getContext().getSystem().getScheduler();
PlayerEntity entity;
final Cancellable store;
final Cancellable stamina;
private record StoreCommand() {
private static StoreCommand self;
public static StoreCommand getInstance() { if (Objects.isNull(self)) self = new StoreCommand(); return self; } }
private record StaminaCommand() { private static StaminaCommand self;
public static StaminaCommand getInstance() { if (Objects.isNull(self)) self = new StaminaCommand(); return self; } }
private PekkoPlayerManager(PlayerEntity entity, Duration storeDuration, Duration staminaDuration) { this.entity = entity; this.store = scheduler.scheduleAtFixedRate( storeDuration, storeDuration, getSelf(), StoreCommand.getInstance(), dispatcher, ActorRef.noSender());
this.stamina = scheduler.scheduleAtFixedRate( staminaDuration, staminaDuration, getSelf(), StaminaCommand.getInstance(), dispatcher, ActorRef.noSender()); }
public static Props props() {
PlayerEntity.Builder builder = PlayerEntity.newBuilder(); builder.setUid(10001); builder.setSid(1); builder.setRid(100); builder.setNickname("MeteorCat"); builder.setUptime(System.currentTimeMillis());
return Props.create(PekkoPlayerManager.class, () -> new PekkoPlayerManager( builder.build(), Duration.ofSeconds(30), Duration.ofSeconds(10) )); }
@Override public void preStart() { log.info("玩家挂载: {}", entity);
}
@Override public void postStop() { if (Objects.nonNull(store)) store.cancel(); if (Objects.nonNull(stamina)) stamina.cancel();
if (Objects.nonNull(entity)) storeEntity(StoreCommand.getInstance());
}
@Override public SupervisorStrategy supervisorStrategy() { return new OneForOneStrategy( 3, Duration.ofSeconds(10), throwable -> { log.warning(throwable, "玩家消息处理异常"); return SupervisorStrategy.restart(); } ); }
@Override public Receive createReceive() { return receiveBuilder() .match(StoreCommand.class, this::storeEntity) .match(StaminaCommand.class, this::staminaCommand) .build(); }
private void staminaCommand(StaminaCommand ignore) { if (this.entity.getStamina() >= 100) return; log.info("玩家体力增长");
PlayerEntity.Builder builder = entity.toBuilder(); builder.setStamina(this.entity.getStamina() + 1); this.entity = builder.build(); }
private void storeEntity(StoreCommand ignore) { log.info("保存玩家角色实体成功"); long now = System.currentTimeMillis(); long offset = now - entity.getUptime(); long online = offset + entity.getOnline();
PlayerEntity.Builder builder = entity.toBuilder(); builder.setUptime(now); builder.setOnline(online);
this.entity = builder.build(); log.info("保存实体成功: {}", this.entity); } } }
|
上面启动之后会创建自定义玩家内存实体, 并且实现了简单的游戏体力自增长和模拟实体保存数据库的功能
对于 Pekko 这样做的好处是如果后续升级到集群处理的时候, 直接就能支持集群的玩家角色属性传递(Pekko 集群底层采用 Protobuf)
Protobuf 生成的对象都是不可变的, 也就代表只允许被重新写入数据构建而不能针对某些字段单独修改.
注意: 玩家客户端只允许生成归于自身单独的 Actor, 否则会出现 Actor 数量上限问题, 单个 Actor 也不会只挂载 PlayerEntity 单一实体
而玩家 Actor 在生成时会实例化并挂载日常需要 manager/mgr(管理器) 相关玩家操作集, 其实也就是在 Actor 挂载的时候实例化自己的类功能调用.
比如挂载成就系统/战斗系统/养成系统等多种系统, 其实也就是在构造的时候 new {特定系统}Manager(entity) 挂载到当前 Actor.
这种情况都是将 Actor 视为玩家自己的线程操作, 只要遵循 Actor 调度规则的话不会出现线程数据异常的问题.
当然实际游戏服务端当中不会这么简单, 这里只是大概说明下作为服务端使用 Protobuf 传输消息和执行操作的过程.
如果可以的话, 尽可能把设计数据结构定义成 Protobuf, 这样有利于在客户端和服务端共享并同步数据, 这里提供战斗单位的结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| // 英雄阵营: 攻击方|防御方 enum HeroCamp{ attacker = 0; // 攻击方 defender = 1; // 防御方 }
// 战斗的英雄单位, 一般是应用在放置类卡牌游戏之中 // 序列序号 1~99 是基础信息 // 序列序号 100~999 是攻击战斗Buffer等属性信息 // 序号序号 1000~1999 是时装|模板属性 message HeroInfo{ int64 id = 1; // 英雄单位ID标识
// 英雄位置, 一般来说放置卡牌游戏都是类似于插槽1,2,3,4这样上阵英雄, 所以只需要数值即可 // 但是如果自走棋之类, 那么应该是棋盘的 (X,Y) 位置属性 int32 grid = 2; int32 level = 3; // 英雄卡牌等级, 如果英雄卡牌需要等级养成系统就需要定义('狗粮'机制), 这种需要和阶级区分(比如绿卡/蓝卡/紫卡)
// 下面两个需要合起来说明 // 属性值集合, 比如攻击速度/最大血量/当前血量/攻击力/暴击率等, 用 <{属性ID},{属性数值}> 做标识 map<int32, int32> initialized = 100; // 初始属性值 map<int32, int32> properties = 101; // 更新属性值 // 注意: 战斗单位的数值会受到大量情况影响, 比如 buffer 增减攻击力等情况, 然后3回合需要回滚数值的情况 // 所以需要记录英雄的初始化值方便恢复默认的数值
// 技能这里简化成 int32 标识行为, 实际上技能涵盖的属性值非常多的, 有时候需要单独设计成类定义 // 注意: 攻击动作其实也是0消耗技能, 也就是普通攻击也可以看作是无消耗且没有增益技能, 其他则是带有消耗和增益属性 int32 attack_normal = 200; // 普通攻击 // int32 skill_active = 201; // 特殊攻击|主动技能, 如果英雄单位简单(只有单个主动技能), 直接可以定义成 int32, 否则就需要定义成 list/map repeated int32 skills_active = 201; // 主动技能组 repeated int32 skills_passive = 202; // 被动技能组
// 这里还有些 buffer 相关的, 但是篇幅太多跳过
int32 tid = 1000; // 模板ID, 用户客户端调用外观装备的模板 ID }
// 英雄队伍信息 message HeroTeam{ HeroCamp camp = 1; // 英雄阵营 repeated HeroInfo heros = 2; // 上阵英雄列表 }
|
类似于这种结构就可以同步给客户端来同步玩家信息, 无论客户端还是服务端采用什么语言都可以按照这种逻辑同步战斗对局信息.
注意: 战斗中只同步变化的 HeroInfo 字段(如血量、Buff 剩余回合), 而非全量同步, 否则会导致大量无意义的数据传输