Kafka 搭建日志系统

在大部分情况下, 常规的 MySQL/PostgreSQL 就足够常规业务的 CURD 操作, 当业务扩展出来就开始有瓶颈.

最能体现这种情况就是 日志系统, 数据库当中的系统日志具有以下特征:

  • 写入量极大且写入频繁: 查询比较少(就后台管理最多不超过100人), 但是写入量极大, 会出现单表超过50GB情况

  • 查询条件复杂: 日志查询通常是按时间范围、关键字、级别、服务名等维度的组合查询, 细致一点还有针对某些属性查询

  • 数据复用率低: 很多日志可能只需要查询半年或者90天数据, 其他时间段很少被查询到

  • 数据结构可能比较灵活: 日志内容常包含 JSON、自由文本等非结构化数据(如接口请求参数、异常堆栈信息)

传统数据库虽然也能做此类数据保存, 但是在查询方面卡顿会非常严重, 并且 CPU 直接暴涨超过 100%.

也基于这种情况而需要外部其他工具来辅助, 也就是日常见到的传统分层处理架构:

  • MySQL/PostgreSQL(冷存储): 负责冷数据落地

  • Kafka(消息队列): 削峰填谷, 承接高并发日志写入, 同时作为热数据缓冲区

  • 其他服务API: 负责接收客户端提交的日志数据, 投递到消息队列之中

Kafka 并不是作为数据库系统来使用, 而是以 JSON 格式记录日志文本记录好相关所需的核心元数据.

Kafka 原生查询能力弱, 可搭配 Kafka Streams 或 Flink 实现简单的实时过滤, 如果不想引入额外架构可以采用 Kafka Streams

本质上对于日志系统大部分情况只需要针对条件来做 区间/匹配/包含/不含 等功能即可, 都是相对比较简单的查询.

Kafka 的查询不再是称呼为 “查询”, 其实叫做 “过滤” 更加贴切

该方案适合中小规模日志场景(日写入量≤1000 万条,查询并发≤100QPS), 后续如果要扩大的大型日志场景(以日写入量过亿)就需要额外扩展.

强烈注意: 前期引入这部分方案不是个好选择, 盲目引入这种大数据架构会加大开发维护的复杂程度

基本上最开始都是写入到 MySQL/PostgreSQL 这类数据库, 就比如下面的 MySQL 用户行为日志上报表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 应用事件上报表
# 注意1: 事件上报是很特殊的情况, 建议分表隔离处理每个应用的事件
# 注意2: 单个应用的事件上报内容其实不算少, 请做好可能上报数据单日会突破50w条的准备
# 注意3: 建议采用 pino_app_report_20250723 格式, 只有当管理员选中指定某天日志才能做筛选, 否则数据库无法负载这么大查询量
create table if not exists pino_app_report
(
# 核心主要信息
id bigint unsigned not null auto_increment comment '数据主键',
action int unsigned not null comment '上报的事件ID',

# 账号关联信息
appid bigint unsigned not null comment '上报所属的应用ID',
uid bigint unsigned not null comment '用户主键, 在 user_info 表中的主键即是用户UID',

# 服务器相关信息
sid varchar(64) not null comment 'CP服务器ID标识',
sname varchar(64) not null comment 'CP服务器名称',

# 角色上报信息
role_id varchar(64) not null default '' comment '玩家在指定服务器当中的角色ID',
role_name varchar(64) not null default '' comment '玩家在指定服务器当中的角色名',
role_balance varchar(64) not null default '' comment '玩家游戏币余额',
role_level varchar(64) not null default '' comment '玩家在指定服务器当中的角色等级',
role_power varchar(64) not null default '' comment '玩家在指定服务器当中的角色战力',
role_gender tinyint unsigned not null default '0' comment '玩家角色性别 0 = 没有性别要素,1 = 男,2 = 女, 3 = 非二元性别',
role_vip varchar(64) not null default '' comment '玩家在指定服务器当中的VIP等级',
role_create_time bigint unsigned not null comment '角色创建时间戳, 以毫秒级UTC时间戳为单位',
role_level_up_time bigint unsigned not null comment '角色升级时间戳, 以毫秒级UTC时间戳为单位',

# 游戏相关 - 职位信息
profession_id varchar(64) not null default '' comment '玩家在游戏之中的职位ID',
profession_name varchar(64) not null default '' comment '玩家在游戏之中的职位名称',

# 游戏相关 - 工会信息
guild_id varchar(64) not null default '' comment '玩家工会或帮派ID',
guild_name varchar(64) not null default '' comment '玩家工会或帮派名称',
guild_master_id varchar(64) not null default '' comment '玩家工会或帮派最高长官ID',
guild_master_name varchar(64) not null default '' comment '玩家工会或帮派最高长官名称',

create_time bigint unsigned not null comment '上报时间, 毫秒级别的UTC时间戳',
create_ip varchar(64) not null comment '上报IP地址',

# 其他扩展信息
extra json not null default JSON_OBJECT() comment '额外扩展信息, 必须采用 {"xxx":"yyy"} 对象组, 有时候需要记录些特殊元数据',
primary key (id)
) comment '应用的上报信息表'
engine = InnoDB
charset = utf8mb4
collate = utf8mb4_unicode_ci;

这就是常规的客户端上报日志表, 这部分数据就是比较常规的日志表, 在量级比较小的情况通过 API 过滤异常数据之后直接写入数据库是没什么问题.

注意: Kafka 虽然作为数据队列, 但是提取之后的数据并不会直接销毁, 而是保存在本地不会去主动视为已使用去抛弃

后面就是规模上来时候可以把 Kafka 穿插到其中处理, 也就是需要其他服务器构建专门的 Kafka 消息队列服务, API 推送到 Kafka 队列.

Kafka 部署

注意: Kafka 服务会生成大量本地日志文件, 所以对于硬盘占用十分庞大, 必须要准备个大容量空间的扩展硬盘.

这里需要区分 Kafka 的版本, 因为在 2.8 之后不再集成内部 ZooKeeper 组件改由内部 KRaft 模式启动服务并且要求最低 Java17

官网手册: Kafka Doc

这里采用 Linux 的二进制处理即可, 需要手动配置环境处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 创建 kafka 专门系统用户来管理(不允许登陆), 用于管理全部相关操作
sudo useradd -r -s /sbin/nologin kafka
sudo apt install openjdk-17-jre # 安装最低依赖

# 注意: 这部分的 /data 是我外置的空间硬盘
# 最好将数据统一规划到外部扩展硬盘空间之中
sudo mkdir -p /data/kafka
sudo chown -R kafka:kafka /data/kafka

# 之后下载二进制包解压, 这里选择 2.13-4.1.1 版本, 先下载到 tmp 目录
# 如果下载速度慢可以用阿里云镜像
# wget https://downloads.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgz -O /tmp/kafka.tgz # 官网
wget https://mirrors.aliyun.com/apache/kafka/4.1.1/kafka_2.13-4.1.1.tgz -O /tmp/kafka.tgz # 阿里云镜像

# 解压内部内容到 /data/kafka, 并且设置 /data/kafka/bin 内部脚本可执行
sudo -u kafka tar -zxvf /tmp/kafka.tgz --strip-components=1 -C /data/kafka
sudo chmod +x /data/kafka/bin/*.sh # 赋予执行权限
sudo -u kafka /data/kafka/bin/kafka-server-start.sh --version # 查看版本号

# 之后就是需要生成本地的集群ID, 哪怕是单机部署也是要构建本地单独的集群ID唯一标识
KAFKA_CLUSTER_ID="$(/data/kafka/bin/kafka-storage.sh random-uuid)" # 生成唯一标识
echo "KAFKA_CLUSTER_ID=\"$KAFKA_CLUSTER_ID\""|sudo -u kafka tee /data/kafka/KAFKA_CLUSTER_ID # 将他写入本地文件, 最好写成 ENV 环境变量模式才方便加载


# 格式化本地日志目录
sudo -u kafka /data/kafka/bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c /data/kafka/config/server.properties
# 至此就是构建环境完成, 接下来就是准备搭建处理
# --standalone 代表创建单节点配置
# -t 声明节点标识
# -c 指定配置目录


# 如果出现 cluster-id 不匹配需要先执行以下操作
# sudo rm -rf /tmp/kraft-combined-logs # 删除 KRaft 元数据目录
# sudo rm -rf /data/kafka/data # 删除 Kafka 数据目录
# 正式环境不要这样操作, 而是要手动去修改数值让 KAFKA_CLUSTER_ID 匹配
# 需要去看 /data/kafka/config/server.properties 的 log.dirs 配置, 这个数据核心目录, 默认的分配集群ID在这个目录下面配置文件
# 默认为 /tmp/kraft-combined-logs 目录, 那么内部修改集群ID则是修改以下文件
sudo vim /tmp/kraft-combined-logs/meta.properties # 修改内部的 cluster.id 即可


# 需要创建修改服务器的启动单元
sudo touch /etc/systemd/system/kafka-server.service
sudo vim /etc/systemd/system/kafka-server.service

/etc/systemd/system/kafka-server.service 系统单元如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[Unit]
Description=Kafka Service
Documentation=https://kafka.apache.org/
After=network.target

[Service]
Type=simple
User=kafka
Group=kafka

# 加载根目录和读取配置
# Environment 指定启动 Jre
# EnvironmentFile 内部变量如下:
# - KAFKA_CLUSTER_ID: 具体集群ID
WorkingDirectory=/data/kafka
EnvironmentFile=/data/kafka/KAFKA_CLUSTER_ID
ExecStart=/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
ExecStop=/bin/kill -TERM $MAINPID
Restart=on-failure
RestartSec=5
LimitNOFILE=65536
PrivateTmp=true

# 成功退出
# 0 是 Linux 进程的标准正常退出码
# 143 对应 SIGTERM 信号的退出码(128+15), 即 Kafka 优雅停止的退出码
SuccessExitStatus=0 143

[Install]
WantedBy=multi-user.target

最后运行启动即可:

1
2
3
sudo systemctl daemon-reload # 刷新系统单元
sudo systemctl start kafka-server # 启动 kafka 服务
sudo systemctl status kafka-server # 查看 kafka 服务

至此 Kafka 服务就已经启动, 实际上可以理解为挂起消息队列服务而已; 而且目前启动会直接报错, 需要修改配置:

以下为 /data/kafka/config/server.properties 文件的具体配置和说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# 需要理解以下相关配置

# 定义节点的角色
# KRaft 模式下 Kafka 节点有两种角色:
# - broker(处理消息生产 / 消费)
# - controller(管理集群元数据、分区副本分配)
# 一般保持默认设置即可, 大规模集群才需要部署时可分离角色
process.roles=broker,controller


# 集群内所有节点的 node.id 必须唯一(建议从 0 或 1 开始编号)
# 单机保持默认即可, 初始化的时候会默认重载修改
node.id=1


# KRaft 核心控制器集群的启动引导地址, 也就是建立集群节点服务地址, 单机部署保持默认即可
# 集群部署需列出所有 controller 节点的 IP:端口(如 1@192.168.1.10:9093,2@192.168.1.11:9093)
# 也就是 {node.id}@{其他服务器IP}:{其他服务器端口},... 来加入集群
controller.quorum.bootstrap.servers=localhost:9093

# KRaft 指定控制器通信使用的监听器名称
# 与下方 listeners 中的 CONTROLLER://:9093 绑定, 明确控制器节点之间通过 CONTROLLER 监听器通信
# 必须与 listener.security.protocol.map 中的键名一致, 不可随意命名
controller.listener.names=CONTROLLER


# 定义 broker 之间通信使用的监听器
# 集群内 broker 之间同步数据、副本复制时, 使用 PLAINTEXT 监听器(需与 listeners 中的名称一致)
# 生产环境若开启 SSL/SASL 认证, 需改为对应的监听器名称(如 SASL_SSL)
inter.broker.listener.name=PLAINTEXT

# 定义节点监听的网络地址和协议
# 格式: 监听器名称://IP:端口, 多个监听器用逗号分隔
# PLAINTEXT://:9092:客户端(生产者 / 消费者)通过 9092 端口使用明文协议通信
# CONTROLLER://:9093:控制器节点之间通过 9093 端口通信, 专用于 KRaft 共识机制
listeners=PLAINTEXT://:9092,CONTROLLER://:9093


# 对外暴露的通信地址, 比较核心的配置之一
# 核心作用: 客户端发起请求实际连接的地址(而非 Kafka 节点的监听地址)
# 单机测试填 localhost 即可; 生产环境必须改为服务器的公网 / 内网 IP(如 PLAINTEXT://192.168.1.100:9092), 否则客户端会因解析不到地址无法连接
# CONTROLLER 的暴露地址仅在控制器节点跨机器部署时需要配置, 单机可填 localhost, 也是和上面同理
# 一般正式都是内网交换数据而不会暴露到公网
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093


# 映射监听器名称与安全协议, 一般不需要修改
# 定义每个监听器使用的安全协议, 如 CONTROLLER:PLAINTEXT 表示 CONTROLLER 监听器使用明文协议
# 一般外网才需要配置安全认证设置, 内网验证可能对性能有所损失
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 网络处理线程数
# 负责处理网络请求的线程数(如接收客户端连接、响应请求)
# 生产环境可设为 CPU核心数 + 1(如 4 核 CPU 设为 5), 避免线程过多导致上下文切换
num.network.threads=3

# IO 处理线程数
# 负责处理磁盘 IO 的线程数(如消息写入磁盘、日志段滚动)
# 生产环境建议设为 CPU核心数 * 2(如 4 核 CPU 设为 8), Kafka 是 IO 密集型服务,需足够的 IO 线程
num.io.threads=8

# 网络发送的缓冲区大小, 默认 100KB
# 若传输大消息可适当调大(如 1MB)
socket.send.buffer.bytes=102400

# 网络接收缓冲区大小, 默认 100KB
# 若传输大消息可适当调大(如 1MB)
socket.receive.buffer.bytes=102400

# 客户端请求的最大字节数, 默认 100MB
# 限制单个请求的大小, 防止超大请求压垮服务, 不过单次提交超过 100MB 的情况一般比较少
socket.request.max.bytes=104857600

# 日志(数据)存储目录, 比较核心的配置之一, 默认会将数据放置在 /tmp 之中
# Kafka 的消息数据、KRaft 元数据都存储在该目录, 不要默认放在 /tmp 之中, 而是要放置在我们扩展的硬盘里面
# 虽然起名 log.dirs, 但是内部是很核心的元数据
# 若有多块硬盘, 可配置多个目录(用逗号分隔), Kafka 会自动负载均衡写入
# log.dirs=/tmp/kraft-combined-logs
# 这里我的外部扩展硬盘为 /data, 相关数据处于 /data/kafka, 内部的日志数据放置在 /data/kafka/store 之中
log.dirs=/data/kafka/store

# 默认分区数, 新建主题时的默认分区数, 默认为 1
# 生产环境建议设为 3-12(根据业务并发度),分区数越多, 消息分发的并行度越高, 但过多分区会增加集群元数据管理开销
num.partitions=1

# 每个数据目录的日志恢复线程数, 默认为 1
# Kafka 启动时, 会用该线程数恢复日志(如重新加载未刷盘的消息)
# 若有多个数据目录(log.dirs 配置多个), 可设为 2-4, 加快启动时的日志恢复速度
num.recovery.threads.per.data.dir=1

# 日志数据最长留存时间, 单位为小时, 比较核心的配置之一
# 超过该时间的日志段会被标记为待清理, 由 Kafka 的日志清理线程定期删除(或压缩)
# 一般互联网日志基本上可以视为 月度(30天)/季度(90天)/年度(365天) 这三个周期, 一般为了做好留存都会保留前后两个年份做统计对比
# 而且大部分数据都会被落地 MySQL/PostgreSQL 之中冷藏等待后续需要的时候检索, 所以删除其实没什么关系(主要数据已经入库)
# 一般都有数据库入库保存, 所以 Kafka 只需要保存 30 天(720小时)数据即可, 后续其他统计日志则是在落地数据库当中去检索
# 超长期留存会导致日志段数量暴增, Kafka 清理线程的扫描和删除操作会消耗大量 CPU/IO 资源, 影响服务性能
# Kafka 日志按分区存储, 若主题分区数较多(如 30 个分区), 365 天的日志会占用 TB 级磁盘空间, 远不如数据库的压缩存储高效
log.retention.hours=720

# 日志清理策略, 默认为 delete(直接删除)
# 也可设为 compact(日志压缩,仅保留最新消息)
# 若为 compact, log.retention.hours 控制 "非活跃日志" 的留存时间
# 如果没有做 MySQL/PostgreSQL 入库就需要保存压缩文件, 有入库则删除从而释放磁盘空间
# 默认不需要去设置


# 日志大小限制, 比较核心的配置之一, 默认不会被启用(注释则不会采用空间清理机制)
# 针对单个分区的日志大小限制配置, 超过这部分容量也会直接触发日志清理(时间或大小, 满足其一即触发清理)
# 这里推荐以下方式选择:
# ------------------------------------------------------------------------------------------------------------------------------
# 业务场景 单分区日产生日志量 log.retention.bytes 推荐值 说明
# 低流量日志(如后台操作日志) 500MB / 天 16106127360(15GB) 30 天总产生 15GB,预留少量冗余应对流量波动
# 中流量日志(如网站访问日志) 2GB / 天 64424509440(60GB) 30 天总产生 60GB,避免因流量突增提前触发大小清理
# 高流量日志(如 APP 埋点日志) 5GB / 天 161061273600(150GB) 若磁盘容量不足,可适当降低log.retention.hours(如改为 15 天)
# ------------------------------------------------------------------------------------------------------------------------------
# log.retention.bytes=1073741824
# 这个配置见仁见智, 如果对磁盘空间有所信息可以不启用; 因为如果设置有所偏差的情况下, 很容易出现还没数据库入库之后就被清理的事故, 但是如果完全不限制则会出现硬盘占用满直接系统异常

修改完成之后重新初始化配置, 最好重新修复相关节点引导:

1
2
3
4
5
6
7
8
9
10
# 需要先关闭 systemd 服务重新生成 UUID
KAFKA_CLUSTER_ID="$(/data/kafka/bin/kafka-storage.sh random-uuid)" # 生成唯一标识
echo "KAFKA_CLUSTER_ID=\"$KAFKA_CLUSTER_ID\""|sudo -u kafka tee /data/kafka/KAFKA_CLUSTER_ID

# 构建全新配置初始化, 会去 /data/kafka/store 之中生成配置
sudo -u kafka /data/kafka/bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c /data/kafka/config/server.properties


sudo systemctl start kafka-server # 启动服务
sudo systemctl status kafka-server # 确认状态

如果没有问题即可, 后续就是准备创建订阅的消息频道的功能.

Topic 部署

上面的流程只是简单的挂起服务而已, 后面就是构建消息队列的具体功能, 首先就是创建频道(Topic)来监听推送过来的事件:

Topic 是 Kafka 中消息的分类/频道, 所有消息都必须发送到指定 Topic, 消费者也只能订阅 Topic 来消费消息

每个 Topic 会被分成多个分区(Partition), 分区是 Kafka 并行处理和数据存储的最小单位;
单机部署时副本数只能为 1, 集群部署可配置多副本提高可靠性.

这里先说下具体的相关命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 确认目前的的 kafka 服务内部有多少 Topic
# localhost:9092 就是 kafka-server 的地址
/data/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

# 之后就是创建 Topic 服务
# 注意: 服务器重启后不需要重新执行创建 Topic 的命令, 因为 Kafka 的 Topic 元数据(包括 Topic 名称、分区数、副本数、配置等)会持久化存储在 KRaft 模式的元数据日志中
# 这里按照上面的配置创建个推送事件 topic( 玩家事件上报: app_report )
# --create: 声明创建行为
# --topic {名称}: 事件 Topic 名
# --partitions {分区数}: 设置 Topic 的消息分区数, 并发高的场景设为 6-12, 分区越多并行消费能力越强
# --replication-factor {副本数}: 设置 Topic 消息副本数量, 集群设为3(2个副本节点做处理, 1个容灾处理), 单机只能为 1
# --bootstrap-server {kafka地址}: 连接的 Kafka 服务地址
/data/kafka/bin/kafka-topics.sh \
--create \
--topic app_report \
--partitions 2 \
--replication-factor 1 \
--bootstrap-server localhost:9092

# 这里会有错误提示
# WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
# 新版本的 Kafka 对 Topic 名称中包含点(.)或下划线(_)的命名冲突提醒, 核心原因是 Kafka 早期的指标(Metric)命名机制会将 Topic 名称中的点和下划线视为等效, 可能导致监控指标混乱
# 内部会将 . 和 _ 视为相同类型字符, 从而出现数据归类异常, 新版本推荐采用 - 符号连接, 比如 xx-yy-zz 这样方式
# 所以建议适应新版本应该采用使用连字符(-)替代, 但是不修改也是没有问题, 这仅仅是对可能出现的异常提醒而已
# 这里重新构建消息通道为 app-report, 采用这种方式之后就不会弹出 Due to limitations in metric names 警告
/data/kafka/bin/kafka-topics.sh \
--create \
--topic app-report \
--partitions 2 \
--replication-factor 1 \
--bootstrap-server localhost:9092
# 成功的时候会提示 Created topic app-report. 代表创建成功


# 修改扩容对应 Topic
# --alter: 代表是修改行为
# 注意: 分区数只能增加, 不能减少, 所以 partitions 必须确认值是否满足
/data/kafka/bin/kafka-topics.sh \
--alter \
--topic app-report \
--partitions 6 \
--bootstrap-server localhost:9092


# 覆盖修改默认系统的留存事件为 15 天
# --add-config: 代表修改不停机系统默认配置
/data/kafka/bin/kafka-configs.sh \
--alter \
--topic app-report \
--bootstrap-server localhost:9092 \
--add-config retention.ms=1296000000


# 查看最后对应节点的详情信息
# --describe: 代表查看详情
/data/kafka/bin/kafka-configs.sh \
--describe \
--topic app-report \
--bootstrap-server localhost:9092


# 删除 Topic
# 注意: 一定要谨慎操作, 删除后数据不可恢复
# 除非万不得已, 否则不要去主动删除内部数据, 而是要让系统过期自动去清理!
/data/kafka/bin/kafka-topics.sh \
--delete \
--topic app_report \
--bootstrap-server localhost:9092

如果出现丢失的 Topic 或者无法使用的情况, 可以排查下是否以下问题:

  • 检测是否 log.dirs 丢失, 切忌不要将 log.dirs 目录丢在临时目录

  • cluster-id 不匹配导致元数据加载失败, 需要同步 cluster-id 一致

  • Kafka 服务未正常启动, 确实系统单元的 systemd 是否正确启动

如果没问题就可以通过查看列表确认消息队列已经完成, 现在就是准备构建 消费者(consumer)生产者(producer) 数据链路.

生产和消费

  • 生产者(producer): 将客户端上报的数据消息过滤清理掉非法部分, 然后放入 Topic 等待被消费

  • 消费者(consumer): 创建数据通道监听对应 Topic, 有数据的时候提取这部分数据落地写入数据库

这里最简单的就是 API 接口接收到客户端数据过滤以下直接导入到 Topic 即可, 数据投递流程就如下顺序:

  1. 客户端上报数据

  2. API接口接收到数据(清洗+生产消息)

  3. API投递消息到 Kafka Topic

  4. 后台异步消费者任务接收到事件

  5. 后台异步消费者将数据同步落地到数据库/数仓

API 接收并清洗的部分可以用很多方式实现, 比如 PHP/Golang/Java 之类都支持 Kafka 直接投递, 这里不同语言都有实现不需要做说明.

这边已经先做好部分数据, 这部分测试数据可以先用着(注意: 文本文件导入必须每行存放一条 JSON 数据):

1
2
3
4
5
6
7
8
9
10
{"action":1001,"appid":10001,"uid":100000,"sid":"srv_001","sname":"烈焰服务器","role_id":"role_100000_123","role_name":"玩家100000_龙","role_balance":"56890","role_level":"25","role_power":"56000","role_gender":1,"role_vip":"3","role_create_time":1735027200000,"role_level_up_time":1735030800000,"profession_id":"prof_002","profession_name":"战士","guild_id":"guild_101","guild_name":"天下第一盟","guild_master_id":"role_001","guild_master_name":"盟主大人","create_time":1735200000000,"create_ip":"192.168.1.100","extra":{"device":"iPhone 15","network":"5G","game_version":"1.2.0","channel":"官方"}}
{"action":1002,"appid":10002,"uid":100001,"sid":"srv_002","sname":"冰霜服务器","role_id":"role_100001_456","role_name":"玩家100001_虎","role_balance":"129800","role_level":"48","role_power":"125000","role_gender":2,"role_vip":"7","role_create_time":1734940800000,"role_level_up_time":1734944400000,"profession_id":"prof_001","profession_name":"法师","guild_id":"guild_102","guild_name":"义薄云天","guild_master_id":"role_002","guild_master_name":"副盟主","create_time":1735200600000,"create_ip":"10.0.0.5","extra":{"device":"Android 14","network":"4G","game_version":"2.0.5","channel":"应用宝"}}
{"action":1003,"appid":10003,"uid":100002,"sid":"srv_003","sname":"雷霆服务器","role_id":"role_100002_789","role_name":"玩家100002_狮","role_balance":"8900","role_level":"12","role_power":"18000","role_gender":0,"role_vip":"0","role_create_time":1735113600000,"role_level_up_time":1735114200000,"profession_id":"prof_003","profession_name":"刺客","guild_id":"","guild_name":"","guild_master_id":"","guild_master_name":"","create_time":1735201200000,"create_ip":"203.0.113.20","extra":{"device":"iPad Pro","network":"Wi-Fi","game_version":"1.5.3","channel":"华为"}}
{"action":1004,"appid":10001,"uid":100003,"sid":"srv_001","sname":"烈焰服务器","role_id":"role_100003_234","role_name":"玩家100003_豹","role_balance":"999999","role_level":"88","role_power":"999999","role_gender":3,"role_vip":"12","role_create_time":1734854400000,"role_level_up_time":1734858000000,"profession_id":"prof_002","profession_name":"战士","guild_id":"guild_101","guild_name":"天下第一盟","guild_master_id":"role_001","guild_master_name":"盟主大人","create_time":1735201800000,"create_ip":"172.16.0.8","extra":{"device":"PC-Windows","network":"Ethernet","game_version":"2.1.0","channel":"小米"}}
{"action":1005,"appid":10002,"uid":100004,"sid":"srv_002","sname":"冰霜服务器","role_id":"role_100004_567","role_name":"玩家100004_龙","role_balance":"34567","role_level":"66","role_power":"78000","role_gender":1,"role_vip":"5","role_create_time":1734940800000,"role_level_up_time":1734944400000,"profession_id":"prof_001","profession_name":"法师","guild_id":"guild_102","guild_name":"义薄云天","guild_master_id":"role_002","guild_master_name":"副盟主","create_time":1735202400000,"create_ip":"192.168.1.101","extra":{"device":"iPhone 15","network":"5G","game_version":"1.8.9","channel":"官方"}}
{"action":1001,"appid":10003,"uid":100005,"sid":"srv_003","sname":"雷霆服务器","role_id":"role_100005_890","role_name":"玩家100005_虎","role_balance":"0","role_level":"1","role_power":"1000","role_gender":2,"role_vip":"0","role_create_time":1735200000000,"role_level_up_time":1735200000000,"profession_id":"prof_003","profession_name":"刺客","guild_id":"","guild_name":"","guild_master_id":"","guild_master_name":"","create_time":1735203000000,"create_ip":"10.0.0.5","extra":{"device":"Android 14","network":"4G","game_version":"1.0.0","channel":"应用宝"}}
{"action":1002,"appid":10001,"uid":100006,"sid":"srv_001","sname":"烈焰服务器","role_id":"role_100006_111","role_name":"玩家100006_狮","role_balance":"78900","role_level":"36","role_power":"45000","role_gender":0,"role_vip":"2","role_create_time":1735027200000,"role_level_up_time":1735030800000,"profession_id":"prof_002","profession_name":"战士","guild_id":"guild_101","guild_name":"天下第一盟","guild_master_id":"role_001","guild_master_name":"盟主大人","create_time":1735203600000,"create_ip":"203.0.113.20","extra":{"device":"iPad Pro","network":"Wi-Fi","game_version":"1.3.5","channel":"华为"}}
{"action":1003,"appid":10002,"uid":100007,"sid":"srv_002","sname":"冰霜服务器","role_id":"role_100007_222","role_name":"玩家100007_豹","role_balance":"56789","role_level":"52","role_power":"62000","role_gender":3,"role_vip":"4","role_create_time":1734940800000,"role_level_up_time":1734944400000,"profession_id":"prof_001","profession_name":"法师","guild_id":"guild_102","guild_name":"义薄云天","guild_master_id":"role_002","guild_master_name":"副盟主","create_time":1735204200000,"create_ip":"172.16.0.8","extra":{"device":"PC-Windows","network":"Ethernet","game_version":"2.0.1","channel":"小米"}}
{"action":1004,"appid":10003,"uid":100008,"sid":"srv_003","sname":"雷霆服务器","role_id":"role_100008_333","role_name":"玩家100008_龙","role_balance":"890000","role_level":"95","role_power":"900000","role_gender":1,"role_vip":"10","role_create_time":1734854400000,"role_level_up_time":1734858000000,"profession_id":"prof_003","profession_name":"刺客","guild_id":"","guild_name":"","guild_master_id":"","guild_master_name":"","create_time":1735204800000,"create_ip":"192.168.1.100","extra":{"device":"iPhone 15","network":"5G","game_version":"2.2.3","channel":"官方"}}
{"action":1005,"appid":10001,"uid":100009,"sid":"srv_001","sname":"烈焰服务器","role_id":"role_100009_444","role_name":"玩家100009_虎","role_balance":"23456","role_level":"78","role_power":"85000","role_gender":2,"role_vip":"8","role_create_time":1735027200000,"role_level_up_time":1735030800000,"profession_id":"prof_002","profession_name":"战士","guild_id":"guild_101","guild_name":"天下第一盟","guild_master_id":"role_001","guild_master_name":"盟主大人","create_time":1735205400000,"create_ip":"192.168.1.101","extra":{"device":"Android 14","network":"4G","game_version":"1.9.2","channel":"应用宝"}}

这里假设默认行为 action 如下:

  • 1001: 注册

  • 1002: 登录

  • 1003: 升级

  • 1004: 充值

  • 1005: 退出

可以利用 kafka 客户端来手动写入 app-report 的 topic 之中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 可以直接手动复制粘贴插入数据
# 输入以下命令进入交互界面, 之后复制上面的数据粘贴输入即可
/data/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic app-report


# 可以通过文件直接导入
# 这部分数据已经放入在 /tmp/test.json 之中
/data/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic app-report < /tmp/test.json


# 最后确认数据是否写入成功
# 需要注意: --from-beginning 代表拉取全部, 正式环境一定要小心执行此操作
/data/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic app-report --from-beginning


# 正式环境需要指定采用最大消息量
# max-messages {最小消息量}: 仅消费1条消息后退出
# --offset latest: 从最新的偏移量开始消费(不回溯历史数据)
/data/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic app-report \
--max-messages 1 \
--offset latest
# 以上命令是会失败的, 因为 topic 有分区特性, 也就是数据会按照特定规则散列保存在不同分区(可以理解为数据分片机制)


# 基于分区特性, 那么就需要获取 topic 有多少个分区, 从而才能从不同分区导出最后的分片数据
/data/kafka/bin/kafka-topics.sh --describe --topic app-report --bootstrap-server localhost:9092
# 以上确定内部消息保存散列的分区, 找到 PartitionCount 关键字, 这就是分区数量, 我这边显示 TopicId: auf3k2y_Tl-_h-_F9fnlTA PartitionCount: 6

# 分区是以 0 做步进, 所以 PartitionCount=6, 代表分区是 0~5
# 这里通过 --partition 参数指定分区ID, 需要查询 PartitionCount 次数的分片来获取各自分片最新消息
# 默认发起消费请求是长轮训, 需要指定 --timeout-ms 来让其获取不到数据直接超时退出, --timeout-ms 3000 代表 3s 超时
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 0 --max-messages 1 --offset latest --timeout-ms 3000
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 1 --max-messages 1 --offset latest --timeout-ms 3000
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 2 --max-messages 1 --offset latest --timeout-ms 3000
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 3 --max-messages 1 --offset latest --timeout-ms 3000
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 4 --max-messages 1 --offset latest --timeout-ms 3000
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-report --partition 5 --max-messages 1 --offset latest --timeout-ms 3000
# 上面还是取不出数据, 因为这些命令行执行的时候是长轮训等待最新数据, 而不会不回溯历史数据
# 我们之前的数据很早就添加, 已经是历史数据而非最新数据

这里其实仅仅作为消息生产和消费的模式, 其实内部有很多实现都没有扩展出来说, 但是作为使用者我们只需要这就是消息队列的实现.

而对于 Kafka 是默认是不支持类似于 SQL 那样的聚合查询操作, 但是有时候能看到有的项目支持 SQL 语法.

这时因为内部扩展集成类似 KSQLDB/FlinkSQL/SparkSQL 等大数据处理工具, 将 Kafka 功能封装成类似 SQL 语法功能.

不过我们主要是采用 producer-consumer 模式, 其他入库同步等操作异步都是由脚本语言后台执行, 所以不关心在命令行执行查询的情况.

而现在已经实现这部分生产者投递, 后面只需要对应语言消费提取出来指定事件写入到 MySQL/PostgreSQL 即可, 这部分后续最好采用分页分表落地.

比如 app_report_{应用ID}(对应APPID) 分表或者 app_report_{20251203}(年月日分表), app_report_{202512}(年月分表) 等不同分表方式

入库这部分见仁见智, 需要按照不同情况来处理写入, 另外还有个额外的点, 那就是数据库表的唯一标识.

其实更加建议入库的数据表 primary key 设置为 UUID, 可以避免迁移或者写入的ID异常, 并且可以避免重复写入的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table if not exists pino_app_report
(
# 标识做好处理的成全局唯一, 可以入库和转移的时候可以规避很多 key 异常的问题
# 并且入库的时候最数据覆盖处理, 可以避免异常(分布式提交)的时候 id 被递增生成重复数据
# UUID 默认都是 36 字符长度, 所以直接采用这部分类型处理
# UUID 的无序性会导致数据库索引频繁分裂, 写入性能较差, 高版本才有递增连续
# 如果对性能方面比较讲究可以采用雪花ID来做标识
id CHAR(36) not null comment '数据主键, 采用UUID来做标识, 如果有相同 UUID 就覆盖该行数据',

# 其他略
primary key (id)
) comment '应用的上报信息表'
engine = InnoDB
charset = utf8mb4
collate = utf8mb4_unicode_ci;

当然如果能实现雪花ID就更好, 也能够提升不少性能; 高版本 UUID 才优化好无序的问题, 采用有序序列方便递增优化

这部分的唯一ID也是在 API 上报的时候由服务端那边生成并顺带投递到 Kafka 之中, 而 UUID 特性保证基本很难出现重复相同情况.

主要大部分情况下 API 服务端都是大规模上报的情况, 这部分推送到 kafka 数据量很大, 必须要多个消费者任务来加快消费避免消息丢失抛弃;
而多个消费者任务也就代表必须要保证 id 不要受到争抢递增ID的占用影响, 所以必须要采用非递增而又能并行插入保证标识唯一.

Kafka 只是作为辅助手段, 主要就是保证数据库冷入库处理, 所以要尽可能安全加速数据库的落地, 避免消息滞留在 Kafka 而被触发删除规则.

数据落地 - Java

注意: Kafka 是作为消息队列方案, 所以要求执行落地的程序是常驻任务, 所以不适合定时调用处理

而需要常驻内存使用就算推荐采用匹配的原生 Java 方案, 这样就可以对技术栈做重合复用.

不过对于运行常驻的 Java 程序不要和 Kafka 在同一服务器之中, 否则高峰时期会发生资源抢占的问题

这里采用 Quarkus 做基础的容器管理和启动架构, Maven 的依赖组件如下(这里复用最开始的 Java17 版本):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.pino.game</groupId>
<artifactId>pino-kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- 全局属性 -->
<properties>
<!-- 编译配置 -->
<compiler-plugin.version>3.14.1</compiler-plugin.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.release>17</maven.compiler.release>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- quarkus 依赖 -->
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.30.4</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.5.4</surefire-plugin.version>

<!-- Kafka 依赖 -->
<kafka.client.version>3.2.1</kafka.client.version>
</properties>

<!-- 全局版本管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<!-- 第三方包依赖配置 -->
<dependencies>

<!-- Quarkus 容器依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>

<!-- Quarkus Kafka 依赖 -->
<!-- 具体使用参照: https://quarkus.io/guides/kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

<!-- Kafka 客户端接口, 需要参照 quarkus-messaging-kafka 依赖的版本 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>


<!-- Quarkus ORM 依赖, 后续选择指定数据库 JDBC 驱动 -->
<!-- 具体驱动列表参照: https://quarkus.io/guides/hibernate-orm#setting-up-and-configuring-hibernate-orm -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>

<!-- Quarkus JDBC 依赖: mariadb -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-mariadb</artifactId>
</dependency>


<!-- Quarkus 测试依赖 -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!-- 公共编译配置 -->
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
<goal>native-image-agent</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<parameters>true</parameters>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<argLine>--add-opens java.base/java.lang=ALL-UNNAMED</argLine>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<quarkus.package.jar.enabled>false</quarkus.package.jar.enabled>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>

内部已经集成 Quarkus 自带的 Kafka 和 ORM 组件, 只需要处理执行内部消费者任务即可, 首先追加 application.properties 配置:

1
2
3
4
5
6
7
8
9
## 通用配置
quarkus.application.name=PinoKafka
## ========== Kafka配置 ==========
# Kafka服务地址
kafka.bootstrap.servers=localhost:9092
# 注意声明的 mp.messaging.incoming.$channel.bootstrap.servers 代表我们是作为生产者
# 但是我们仅仅指向作为消费者就不需要配置推送通道相关
# mp.messaging.incoming.app-report.bootstrap.servers=smallrye-kafka
# 其他略

之后就是创建消费者的启动任务, 这里直接依托 Quarkus 设置启动任务来执行即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io.quarkus.runtime.Startup;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
* Kafka 上报消息消费者,实现数据落地 MariaDB
*/
@Startup // 应用启动之后优先完成初始化
@ApplicationScoped
public class AppReportConsumer {

/**
* 日志对象
*/
private static final Logger logger = LoggerFactory.getLogger(AppReportConsumer.class);

/**
* 消费Kafka主题:app-report
*/
@Incoming("app-report") // 消费的 topic
// @Blocking // 标记为阻塞操作,避免占用Reactive线程池(已经保证了 UUID 唯一性, 可以不需要)
// @Transactional // 事务管理,确保批量入库原子性(对于写入操作 ORM 强制需要事务安全)
public void consumeMessages(List<String> messages) {
logger.info("接收到Kafka批量消息,数量:%d", messages.size());

// TODO:需要通过 JSON 解析内部数据, 这里最好定义好具体 DTO 对象用来转化JSON
// 这部分采用自带 Jackson JSON 处理工具即可

// 这里仅做展示后续自己编写入库代码
messages.forEach(msg -> logger.info("Message = {}", msg));

// todo: 数据入库处理
}
}

这边入库代码我会简略编写, 不然代码占据太多篇幅观感也不好, 具体就是消费 Kafka 的消息入库而已

最后测试启动之后动态推入数据, 确认 PinoKafka 程序是否能够接收到消息队列推送的消息.