上报数据日志指南

常规上报数据需要实现以下功能:

  • 削峰 - 避免高并发写入过载, 保证大规模数据入库的时候不会直接让存储过程崩溃, 避免瞬时高并发、流量突增造成数据写入异常
  • 压缩 - 降低存储数据的占用, 日志数据本身属于很尴尬的数据, 过期之后很少去查询到, 但是统计要让其后续保存待用
  • 分页 - 大量数据需要被前端做页面数据处理, 大数据分页的时候 OFFSET + LIMIT 会有大量性能问题

高并发网络流量突发请求上来的大数据会瞬间引发数据锁抢占, 直接让数据库承载不了这部分数据写入, 卡死整个数据接口

这部分最通用的方案: 消息队列 + 列数据库, 而这方面目前的通用选型就是 Kafka + ClickHouse

Kafka + ClickHouse 是经过业内验证过, 最为高效且便捷的技术栈

Kafka 和 ClickHouse 的搭建流程可以参照其他文章说明, 这里是提出可用的技术方案:

  • Maven: 后台的包管理方案, 后端不要搞什么 Gradle 破坏性更新频繁来做包维护

  • Java17: 高版本支持 record 特性, 可以节约时间避免写入样板代码(我不喜欢 lombok 这种)

  • Quarkus: 底层容器化/网络接口/数据库ORM方面的支持

  • Kafka: 网络接口现在投递到 Kafka 消息队列而不是直接数据入库

  • ClickHouse: 专门存放不可变的日志数据, 方便做聚合统计

  • MySQL/PostgreSql: 专门存放可变并且强一致性的数据, 除非数据确定不可变否则默认都是入库至此

注: Kafka 并不是必须(引入会加大复杂度), 这是作为请求削峰的中间件, 如果并发压力没这么大可以直接写入 ClickHouse

那么就需要将 Java 项目区分成以下项目, 而非集成在一起:

  • message - 消息队列后台常驻任务, 用于消费 Kafka 数据验证完成调配入库(区分投递到列数据库还是行数据库)

  • web - 对外暴露接口, 用于接收客户端通用 JSON 请求, 清洗掉异常数据之后就投递到消息队列

这部分还有个好处就是业务隔离起来, web 方案只负责将消息投递到消息队列, 其他数据库入库则另外服务来处理, 避免泄漏连带一起暴露

目前设计简单的数据上报表格式如下, 这里采用 MySQL 格式处理可以方便简单查看到注释说明(这是早期 MYSQL 方案时候留下的表):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
create table if not exists pino_app_report
(
# 核心主要信息
uuid CHAR(36) not null comment '数据主键, 采用UUID来做标识, 如果有相同 UUID 就覆盖该行数据',
action int unsigned not null comment '上报的事件ID',

# 账号关联信息
appid bigint unsigned not null comment '上报所属的应用ID',
uid bigint unsigned not null comment '用户主键, 在 user_info 表中的主键即是用户UID',

# 服务器相关信息
sid varchar(64) not null comment 'CP服务器ID标识',
sname varchar(64) not null comment 'CP服务器名称',

# 角色上报信息
role_id varchar(64) not null default '' comment '玩家在指定服务器当中的角色ID',
role_name varchar(64) not null default '' comment '玩家在指定服务器当中的角色名',
role_balance varchar(64) not null default '' comment '玩家游戏币余额',
role_level varchar(64) not null default '' comment '玩家在指定服务器当中的角色等级',
role_power varchar(64) not null default '' comment '玩家在指定服务器当中的角色战力',
role_gender tinyint unsigned not null default '0' comment '玩家角色性别 0 = 没有性别要素,1 = 男,2 = 女, 3 = 非二元性别',
role_vip varchar(64) not null default '' comment '玩家在指定服务器当中的VIP等级',
role_create_time bigint unsigned not null comment '角色创建时间戳, 以毫秒级UTC时间戳为单位',
role_level_up_time bigint unsigned not null comment '角色升级时间戳, 以毫秒级UTC时间戳为单位',

# 游戏相关 - 职位信息
profession_id varchar(64) not null default '' comment '玩家在游戏之中的职位ID',
profession_name varchar(64) not null default '' comment '玩家在游戏之中的职位名称',

# 游戏相关 - 工会信息
guild_id varchar(64) not null default '' comment '玩家工会或帮派ID',
guild_name varchar(64) not null default '' comment '玩家工会或帮派名称',
guild_master_id varchar(64) not null default '' comment '玩家工会或帮派最高长官ID',
guild_master_name varchar(64) not null default '' comment '玩家工会或帮派最高长官名称',

create_time bigint unsigned not null comment '上报时间, 毫秒级别的UTC时间戳',
create_ip varchar(64) not null comment '上报IP地址',

# 其他扩展信息
environment json not null default JSON_OBJECT() comment '额外扩展信息, 必须采用 {"xxx":"yyy"} 对象组, 有时候需要记录些特殊元数据',
primary key (uuid)
) comment '应用的上报信息表'
engine = InnoDB
charset = utf8mb4
collate = utf8mb4_unicode_ci;

这张表是作为参考, 后面的按情况来修改成 ClickHouse 使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
-- 对等设计成 ClickHouse 的表结构
-- 项目数据库最好设计成 {项目名}_{具体业务分类} 的设计
-- 后续如果重新创建项目才能互相隔离起来
CREATE DATABASE IF NOT EXISTS pino_logs;

-- 创建对应行为上报表
CREATE TABLE IF NOT EXISTS pino_logs.app_report
(
-- 核心主要信息
uuid UUID COMMENT '数据主键, UUID标识(ClickHouse自带有专门类型)',
action UInt32 COMMENT '上报的事件ID(无符号int对应UInt32)',

-- 账号关联信息
appid UInt64 COMMENT '上报所属的应用ID(bigint unsigned对应UInt64)',
uid UInt64 COMMENT '用户主键(外部的用户标识ID)',

-- 服务器相关信息
sid LowCardinality(String) COMMENT 'CP服务器ID标识(低基数,用LowCardinality优化)',
sname LowCardinality(String) COMMENT 'CP服务器名称(低基数优化)',

-- 角色上报信息
role_id String COMMENT '玩家角色ID',
role_name String COMMENT '玩家角色名',
role_balance String COMMENT '玩家游戏币余额',
role_level String COMMENT '角色等级',
role_power String COMMENT '角色战力',
role_gender UInt8 COMMENT '角色性别 0=无,1=男,2=女,3=非二元性别(tinyint unsigned对应UInt8)',
role_vip String COMMENT 'VIP等级(若为纯数字可改为UInt8)',
role_create_time UInt64 COMMENT '角色创建时间戳(毫秒级UTC数值)',
role_level_up_time UInt64 COMMENT '角色升级时间戳(毫秒级UTC数值)',

-- 游戏相关 - 职位信息
profession_id LowCardinality(String) COMMENT '职位ID(低基数优化)',
profession_name LowCardinality(String) COMMENT '职位名称(低基数优化)',

-- 游戏相关 - 工会信息
guild_id String COMMENT '工会/帮派ID',
guild_name String COMMENT '工会/帮派名称',
guild_master_id String COMMENT '工会最高长官ID',
guild_master_name String COMMENT '工会最高长官名称',

-- 上报基础信息
create_time UInt64 COMMENT '上报时间戳(毫秒级UTC数值)',
create_ip String COMMENT '上报IP地址',

-- 其他扩展信息(ClickHouse支持JSON格式)
environment JSON COMMENT '额外扩展信息'
) ENGINE = ReplacingMergeTree() -- 采用合并列引擎
PARTITION BY toDate(toDateTime64(create_time / 1000, 3)) -- 按日期分区(转换为日期用于分区)
ORDER BY (appid, uid, action, create_time, uuid) -- 按时间数值排序,加速范围查询
COMMENT '应用的上报信息表';

这部分也是代表结构升级变迁: 最开始采用常规数据库, 系统负载不了的时候及时切换到 ClickHouse.

之后就是 Kafka 也需要创建通用的消息队列 Topic, 我这边命名为 pino-app-report:

1
2
3
4
5
6
7
# 在 kafka 所在的服务之中创建 pino-app-report 消息队列, 用于接收对应的消息
/data/kafka/bin/kafka-topics.sh \
--create \
--topic pino-app-report \
--partitions 2 \
--replication-factor 1 \
--bootstrap-server localhost:9092

之后就是准备将其相关的中间件给 “拼合” 起来, 从而将整个服务流程跑通.

Web

首先是对外暴露接口的 Web 服务, 用于接收外部的 JSON 请求并投递到消息队列, POM 文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<!-- 基础属性 -->
<modelVersion>4.0.0</modelVersion>
<groupId>me.meteorcat.game</groupId>
<artifactId>pino-web</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- 全局属性 -->
<properties>
<!-- 编译配置 -->
<compiler-plugin.version>3.14.1</compiler-plugin.version>
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- quarkus 依赖 -->
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.30.5</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.5.4</surefire-plugin.version>

<!-- Kafka 依赖 -->
<kafka.client.version>3.2.1</kafka.client.version>
</properties>

<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<!-- 第三方包依赖配置 -->
<dependencies>

<!-- Quarkus 容器依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>

<!-- Quarkus REST 依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>

<!-- 核心 Validation 依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-validator</artifactId>
</dependency>

<!-- Quarkus RestJSON 依赖, 支持高级的 record 特性 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson</artifactId>
</dependency>


<!-- Quarkus Kafka 依赖 -->
<!-- 具体使用参照: https://quarkus.io/guides/kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

<!-- Kafka 客户端接口, 需要参照 quarkus-messaging-kafka 依赖的版本 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>


<!-- Quarkus 测试依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>

<!-- Quarkus 测试 REST 依赖 -->
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!-- 公共编译配置 -->
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>native-image-agent</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<quarkus.package.jar.enabled>false</quarkus.package.jar.enabled>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>

直接复制过去即可, 基本工具最多也是版本方面需要修改, 还有启动 Application 入口而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package me.meteorcat.game;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;

/**
* 项目启动入口
* <p>
* 参考文档: <a href="https://cn.quarkus.io/guides/lifecycle">应用程序的初始化和终止</a>
*/
@QuarkusMain
public class PinoWebApplication {

/**
* 程序启动入口
*
* @param args 参数列表
*/
public static void main(String[] args) {
Quarkus.run(args);
}
}

再配置 application.properties 用来添加 Kafka 和其他相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#####################################
## 基础信息
#####################################
quarkus.http.host=127.0.0.1
quarkus.http.port=18089
quarkus.hibernate-validator.fail-fast=true
quarkus.jackson.property-naming-strategy=SNAKE_CASE
quarkus.jackson.write-dates-as-timestamps=true
#####################################
## Kafka信息
#####################################
kafka.bootstrap.servers=localhost:9092
mp.messaging.outbound.pino-app-report.connector=smallrye-kafka
mp.messaging.outbound.pino-app-report.topic=pino-app-report

生成关联的生成者工具对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package me.meteorcat.game.form;

import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
import org.hibernate.validator.constraints.Range;

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;

/**
* 数据上报的结构体, 用于提供请求验证
*
* @param action
* @param appid
* @param uid
* @param sid
* @param sname
* @param roleId
* @param roleName
* @param roleBalance
* @param roleLevel
* @param rolePower
* @param roleGender
* @param roleVip
* @param roleCreateTime
* @param roleLevelUpTime
* @param professionId
* @param professionName
* @param guildId
* @param guildName
* @param guildMasterId
* @param guildMasterName
* @param environment
*/
public record AppReportForm(

String uuid,

@NotNull
@Positive
Long action,

@NotNull
@Positive
Long appid,

@NotNull
@Positive
Long uid,

@NotNull
String sid,

@NotNull
String sname,

@NotNull
String roleId,

@NotNull
String roleName,

@NotNull
String roleBalance,

@NotNull
String roleLevel,

@NotNull
String rolePower,


@NotNull
@Range(min = 0, max = 3)
Byte roleGender,

@NotNull
String roleVip,

@NotNull
Long roleCreateTime,

@NotNull
Long roleLevelUpTime,

@NotNull
String professionId,

@NotNull
String professionName,


@NotNull
String guildId,

@NotNull
String guildName,

@NotNull
String guildMasterId,

@NotNull
String guildMasterName,


Long createTime,

String createIp,

@NotNull
Map<String, String> environment

) implements Serializable {

/**
* 填充构建
*/
public AppReportForm fill(String ip) {
return new AppReportForm(
UUID.randomUUID().toString(),
this.action,
this.appid,
this.uid,
this.sid,
this.sname,
this.roleId,
this.roleName,
this.roleBalance,
this.roleLevel,
this.rolePower,
this.roleGender,
this.roleVip,
this.roleCreateTime,
this.roleLevelUpTime,
this.professionId,
this.professionName,
this.guildId,
this.guildName,
this.guildMasterId,
this.guildMasterName,
System.currentTimeMillis(),
ip,
this.environment
);
}
}

最后就是暴露出来的 API 接口对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package me.meteorcat.game.controllers.app;

import jakarta.inject.Inject;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Response;
import me.meteorcat.game.form.AppReportForm;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

/**
* 通用的数据上报接口
*/
@Path("/app/report")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class AppReportController {

/**
* 日志句柄
*/
final Logger logger = LoggerFactory.getLogger(AppReportController.class);


/**
* 消息队列的数据发送器
*/
@Inject
@Channel("pino-app-report")
Emitter<AppReportForm> emitter;


/**
* 提取IP地址
*/
private String fetchIpAddress(HttpHeaders headers) {
String ip = headers.getHeaderString("X-Real-IP");
if (ip == null || ip.isBlank()) ip = headers.getHeaderString("X-Forwarded-For");
return ip == null || ip.isBlank() ? "unknown" : ip;
}


/**
* 默认版本接口
*
* @return JSON
*/
@POST
@Path("v1")
public Response v1(@NotNull @Valid AppReportForm form, @Context HttpHeaders headers) {
// 填充构建推送数据结构
String ip = fetchIpAddress(headers);
form = form.fill(ip);
logger.info("form : {}", form);

try {
// 投递推送消息
emitter.send(form);

// 这里主要简要生成响应对象, 实际上响应类应该自定义成全局通用工具
return Response
.ok()
.entity(Map.of(
"status", 0,// 默认状态为 0 代表成功
"message", "SUCCESS", // 默认成功|错误消息
"data", Collections.emptyMap() // 默认数据对象为空对象组
))
.build();
} catch (Exception e) {
logger.error("上报接口异常", e);
return Response
.ok()
.entity(Map.of(
"status", 1,// 这里异常需要自定义
"message", "SUCCESS", // 默认成功|错误消息
"data", Collections.emptyMap() // 默认数据对象为空对象组
))
.build();
}
}
}

这里仅仅作为简略说明, 其实内部还有很多优化的地方, 但是目前已经能够测试跑通对应的请求接口:

1
2
3
4
# 测试推送给远程消息上报
curl --location 'http://127.0.0.1:18089/app/report/v1' \
--header 'Content-Type: application/json' \
--data '{"action": 1001, "appid": 1, "uid": 100001, "sid": "s101", "sname": "烈焰服务器", "role_id": "role_100001", "role_name": "战神归来", "role_balance": "99999.99", "role_level": "80", "role_power": "120000", "role_gender": 1, "role_vip": "8", "role_create_time": 1765000000000, "role_level_up_time": 1766721540192, "profession_id": "p001", "profession_name": "战士", "guild_id": "g1001", "guild_name": "天下第一公会", "guild_master_id": "100000", "guild_master_name": "公会会长", "environment": {"device": "iPhone15", "os": "iOS17", "network": "5G"}}'

可以去监控 Kafka 消息队列, 跑通之后就是要准备生成常驻消息队列的消费者进程.

Message

常驻任务用来监听消息队列从而入库处理, 这部分都是要单独准备单台服务器来处理, 这部分其实差别也不大:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<!-- 基础属性 -->
<modelVersion>4.0.0</modelVersion>
<groupId>me.meteorcat.game</groupId>
<artifactId>pino-message</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- 全局属性 -->
<properties>
<!-- 编译配置 -->
<compiler-plugin.version>3.14.1</compiler-plugin.version>
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- quarkus 依赖 -->
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.30.5</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.5.4</surefire-plugin.version>

<!-- Kafka 依赖 -->
<kafka.client.version>3.2.1</kafka.client.version>

<!-- clickhouse 依赖 -->
<clickhouse.version>0.8.0</clickhouse.version>
</properties>

<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<!-- 第三方包依赖配置 -->
<dependencies>

<!-- Quarkus 容器依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>

<!-- Quarkus JSON 解析器 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>

<!-- Quarkus Kafka 依赖 -->
<!-- 具体使用参照: https://quarkus.io/guides/kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

<!-- Kafka 客户端接口, 需要参照 quarkus-messaging-kafka 依赖的版本 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>


<!-- ClickHouse 官方依赖: https://clickhouse.ac.cn/docs/en/integrations/java/jdbc-v2 -->
<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.version}</version>
<classifier>shaded-all</classifier>
</dependency>

<!-- Quarkus 测试依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<!-- 公共编译配置 -->
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>native-image-agent</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<quarkus.package.jar.enabled>false</quarkus.package.jar.enabled>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>

这里不需要 Rest 接口相关, 只需要处理对应消息队列内容即可

这里因为 Quarkus 官方并没有集成对应 ClickHouse 驱动, 所以只能手动来安装文档初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/// ---------------------------------------------
/// 属性配置类
/// ---------------------------------------------
import com.clickhouse.jdbc.ClickHouseDataSource;
import io.smallrye.config.ConfigMapping;


import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
/**
* 扩展的传递 Clickhouse 配置
*/
@ConfigMapping(prefix = "clickhouse")
public interface ClickHouseConfigurator {

/**
* JDBC 地址链接
*/
String jdbcUrl();

/**
* 配置属性
*/
Map<String, String> properties();


default ClickHouseDataSource getDataSource() throws SQLException {
Properties properties = new Properties();
properties.putAll(properties());
return new ClickHouseDataSource(jdbcUrl(), properties);
}

}


/// ---------------------------------------------
/// 工厂生成类
/// ---------------------------------------------
import com.clickhouse.jdbc.ClickHouseDataSource;
import jakarta.enterprise.context.ApplicationScoped;

import java.sql.SQLException;

@ApplicationScoped
public class ClickHouseFactory {

@ApplicationScoped
public ClickHouseDataSource getClickHouseDataSource(ClickHouseConfigurator configurator) throws SQLException {
return configurator.getDataSource();
}
}

之后简单处理下消费者任务功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package me.meteorcat.game;

import com.clickhouse.jdbc.ClickHouseDataSource;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
* 消费者进程
*/
@Startup
@ApplicationScoped
public class PinoMessageConsumer {

/**
* 日志句柄
*/
private static final Logger logger = LoggerFactory.getLogger(PinoMessageConsumer.class);

/**
* 加载 clickHouse 驱动
*/
@Inject
ClickHouseDataSource clickHouseDataSource;


/**
* 消费Kafka主题:app-report
*
* @param messages 消息队列
*/
@Incoming("pino-app-report") // 消费的 topic
public void consumeMessages(List<String> messages) {
if (messages.isEmpty()) return;
logger.info("接收到 Kafka 批量消息, 数量:{}", messages.size());

// 投递写入到 MYSQL
try (Connection connection = clickHouseDataSource.getConnection()) {
connection.setAutoCommit(false); // 关闭自动提交
String insertSql = String.format("INSERT INTO app_report FORMAT JSONEachRow \n %s", String.join("\n", messages));
try (Statement statement = connection.createStatement()) {
statement.execute(insertSql); // 这里是有安全隐患的
}
connection.commit(); // 手动提交上去
} catch (SQLException e) {
logger.error("执行错误", e);
throw new RuntimeException(e);
}
}
}

更加推荐官方的 Java Client 而非 JDBC 来操作:https://clickhouse.com/docs/integrations/language-clients/java/client

一般来说执行 API 接口传输之后就能转发投递到 ClickHouse 内部, 消息队列过程写得比较粗糙, 不过影响具体的功能实现.

拼接 SQL 风险很大, 这里仅做示范, 正确方式是检查所有字段入库并过滤特殊符号(官方的 Client 底层有处理方法, JDBC 没有)

一般来说这里就足够跑完基础的业务流程, 打开 ClickHouse Client 查看数据是否完整即可, 这就是总体简单的日志消息结构对象.

后续就是细化这部分流程, 追加上特定的数据安全检查和异常拦截等, 总体逻辑上面差不多概括完成.