目录

实战指南构建高可用生产级Kafka集群的完整教程

实战指南:构建高可用生产级Kafka集群的完整教程

https://i-blog.csdnimg.cn/img_convert/d5c972ce42e8b2c88ad860580cb3ef89.gif#pic_center

前文教程:

一、多节点 Kafka 集群部署

1. 环境准备

此处需注意,如已经安装测试过 Kafka 单节点,请在安装集群之前清空之前配置的目录

sudo rm -r /var/lib/zookeeper
sudo rm -r /var/lib/kafka-logs
sudo mkdir -p /var/lib/{zookeeper,kafka-logs}
sudo chown -R kafka:kafka /var/lib/{zookeeper,kafka-logs} /opt/kafka
  • 3台服务器 :假设 IP 为 192.168.1.101 , 192.168.1.102 , 192.168.1.103
  • 统一安装 Kafka :在每台服务器重复 之前的安装步骤,但是暂时不启动各个服务,确保路径一致(如 /opt/kafka
  • 同步配置 :确保所有节点配置文件的 zookeeper.connect 指向相同的 ZooKeeper 集群
2. ZooKeeper 集群配置
  1. 每台 ZooKeeper 节点配置

    config/zookeeper.properties

    # 基础时间单位(毫秒),默认 2000
    tickTime=2000
    
    # Leader 等待 Follower 初始连接的最长时间(tickTime 的倍数)
    initLimit=5
    
    # Leader 与 Follower 心跳同步的最大延迟时间(tickTime 的倍数)
    syncLimit=2
    
    # 其他原有配置保持不变
    dataDir=/var/lib/zookeeper
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    # 集群节点列表(所有 ZooKeeper 节点需一致)
    server.1=192.168.1.101:2888:3888
    server.2=192.168.1.102:2888:3888
    server.3=192.168.1.103:2888:3888
  2. 创建 myid 文件 (每个节点唯一):

    # 在 192.168.1.101 上执行
    echo "1" > /var/lib/zookeeper/myid
    
    # 在 192.168.1.102 上执行
    echo "2" > /var/lib/zookeeper/myid
    
    # 在 192.168.1.103 上执行
    echo "3" > /var/lib/zookeeper/myid

    此处要确认 myid ,对于用户 kafka 有权限。

    https://i-blog.csdnimg.cn/img_convert/0babe8b5344048fd6efb53d8a9618bf0.png

  3. 启动 ZooKeeper 集群

    # 所有节点执行
    sudo systemctl start zookeeper
    # 检查集群状态(任一节点执行)
    echo srvr | nc <服务器IP> 2181 | grep Mode
    # 应输出 "leader" 或 "follower"

    https://i-blog.csdnimg.cn/img_convert/42907ab1495a01a3706825d83fd79098.jpeg

    https://i-blog.csdnimg.cn/img_convert/fe569e42dbdbd0245b912c373c50c357.jpeg

  4. 测试集群是否创建成功

    #模拟选举事件
    # 停止当前 Leader
    sudo systemctl stop zookeeper
    
    # 查看其他节点日志
    echo srvr | nc <服务器IP> 2181 | grep Mode

    https://i-blog.csdnimg.cn/img_convert/4faa656d5285cfdc998189afef812307.jpeg

    可以看到 Leader 已经从133转到了134节点。

    #数据同步验证
    #在Leader节点创建临时节点并写入数据data
    /opt/kafka/bin/zookeeper-shell.sh 192.168.6.134:2181 <<< "create -e /test-node 'data'"
    ## 在 Follower 节点检查数据
    /opt/kafka/bin/zookeeper-shell.sh 192.168.6.133:2181 <<< "get /test-node"

    https://i-blog.csdnimg.cn/img_convert/21efa5cc863147bd9b49ddb4eba53654.jpeg

    https://i-blog.csdnimg.cn/img_convert/08398ee4fe8d16ee6508e43ad8dd8e48.jpeg

3. Kafka 集群配置
  1. 修改每台 Kafka 节点的 server.properties

    此处的其他配置于上一个教程一样即可。

    # 节点1 (192.168.1.101)
    broker.id=1
    listeners=PLAINTEXT://192.168.1.101:9092
    advertised.listeners=PLAINTEXT://192.168.1.101:9092
    zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
    
    # 节点2 (192.168.1.102)
    broker.id=2
    listeners=PLAINTEXT://192.168.1.102:9092
    advertised.listeners=PLAINTEXT://192.168.1.102:9092
    zookeeper.connect=同上
    
    # 节点3 (192.168.1.103)
    broker.id=3
    listeners=PLAINTEXT://192.168.1.103:9092
    advertised.listeners=PLAINTEXT://192.168.1.103:9092
    zookeeper.connect=同上

    关键参数解释

    • advertised.listeners :客户端实际连接的地址(如果跨网络需配置公网IP或域名)
    • zookeeper.connect :所有 ZooKeeper 节点地址,用逗号分隔
  2. 启动 Kafka 集群

    # 所有节点执行
    sudo systemctl start kafka
    
    # 检查 Broker 是否注册到 ZooKeeper
    /opt/kafka/bin/zookeeper-shell.sh <服务器IP>:2181 ls /brokers/ids
    # 应输出 [1,2,3]

    https://i-blog.csdnimg.cn/img_convert/02e66f297d0bb3d8cd9dab19f98f5a4d.png

  3. 创建 Topic(验证集群)

    # 在任意节点执行
    /opt/kafka/bin/kafka-topics.sh --create \
      --topic cluster-test \
      --bootstrap-server 192.168.1.101:9092 \
      --partitions 3 \
      --replication-factor 3  # 副本数≤Broker数量
    
    # 查看 Topic 详情
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.1.101:9092

    期望输出 :每个分区的 LeaderReplicas 分布在多个 Broker 上。


二、性能调优

1. JVM 参数优化

编辑 Kafka 启动脚本 bin/kafka-server-start.sh

# 修改这一行
export KAFKA_HEAP_OPTS="-Xms3G -Xmx3G -XX:MetaspaceSize=96m -XX:+UseG1GC"
  • Xms/Xmx :堆内存(建议不超过物理内存的 50%)
  • UseG1GC :G1 垃圾回收器(适合大内存)
2. Kafka 参数优化
# server.properties
num.network.threads=8  # 网络线程数(默认3)
num.io.threads=16      # IO 线程数(默认8)
log.flush.interval.messages=10000  # 累积多少消息后刷盘
log.flush.interval.ms=1000         # 最多等待多久刷盘

三、集群监控与管理

1. 使用 EFAK(kafka eagle)(可视化工具)
  1. 安装

    访问官网下载安装包:

    https://i-blog.csdnimg.cn/img_convert/c517ac734b0dd70e8b4152a5104bb7bc.png

    下载后上传至服务器解压

    tar -zvxf kafka-eagle-bin-3.0.1.tar.gz
    cd kafka-eagle-bin-3.0.1/
    tar -zxvf efak-web-3.0.1-bin.tar.gz
    mv efak-web-3.0.1 /opt/efak
  2. 配置 Mysql 用来储存元数据

    CREATE DATABASE ke_paco DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
    CREATE USER 'ke_paco'@'%' IDENTIFIED BY 'paco123';
    GRANT ALL PRIVILEGES ON ke_paco.* TO 'ke_paco'@'%';
    FLUSH PRIVILEGES; -- 刷新权限生效
  3. 设置环境变量

    vim /etc/profile

    写入如下内容,配置 jdkEFAK 的环境变量:

    export KE_HOME=/opt/efak
    export PATH=$PATH:$KE_HOME/bin
    
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
    export PATH=$JAVA_HOME/bin:$PATH
  4. 修改配置文件

    cd /opt/efak/conf
    vim system-config.properties

    配置 zookeeper 地址:

    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=192.168.6.131:2181,192.168.6.133:2181,192.168.6.134:2181

    此处建议复制kafka配置文件的zookeeper配置。

    配置 Mysql :

    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://<mysql ip>:3306/ke_paco?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=ke_paco
    efak.password=paco123
  5. 开启JMX监控

    修改 Kafka 启动配置,参考如下,按自己需要修改:

    vi /opt/kafka/bin/kafka-server-start.sh
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70 -Djava.rmi.server.hostname=<改为当前节点IP> -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
        export JMX_PORT="9999"

    https://i-blog.csdnimg.cn/img_convert/ccd371e880afffff9c168dd5f2a424b8.png

  6. 启动EFAK

    如若Kakfa未作任何其他配置,则可直接启动

    $KE_HOME/bin/ke.sh start

    启动成功后如图:

    https://i-blog.csdnimg.cn/img_convert/8a6d93b3b51d307f31f958e6de152686.png

    访问UI界面:

    https://i-blog.csdnimg.cn/img_convert/fca2ad17511845eb20c3860a9dacd300.jpeg

    https://i-blog.csdnimg.cn/img_convert/bed0daf9a52c3396223ccf4e9b7237a3.jpeg

    https://i-blog.csdnimg.cn/img_convert/fe812ecec8b4539e65437fbd0c1d691a.png


四、灾难恢复验证

  1. 创建 Topic

    在 Kafka 集群中创建一个测试 Topic,用于后续的测试。

    # 创建 Topic,指定分区数和副本因子
    /opt/kafka/bin/kafka-topics.sh --create \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --partitions 3 \
      --replication-factor 2
    
    # 查看 Topic 详情,确认创建成功
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092

    输出示例

    https://i-blog.csdnimg.cn/img_convert/0672288a61343df474b070b9c2029a23.jpeg

  2. 写入消息

    向 Topic 中写入测试消息,用于验证数据持久化和消费。

    操作步骤

    # 使用 kafka-console-producer 写入消息
    echo "test message 1" | /opt/kafka/bin/kafka-console-producer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092
    
    # 写入多条消息
    for i in {1..10}; do
      echo "test message $i" | /opt/kafka/bin/kafka-console-producer.sh \
        --topic cluster-test \
        --bootstrap-server 192.168.6.131:9092
    done

    https://i-blog.csdnimg.cn/img_convert/eb34733f7d65aa6e325f82f3a716dd6d.jpeg

  3. 消费消息

    启动一个消费者(任意其他节点),验证消息是否成功写入。

    操作步骤

    # 使用 kafka-console-consumer 消费消息
    /opt/kafka/bin/kafka-console-consumer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --from-beginning

    https://i-blog.csdnimg.cn/img_convert/b7b452e05d7dde7ba0a50fac9fa9aa59.jpeg

  4. 模拟 Broker 宕机

    停止一个 Broker,观察分区 Leader 是否切换。

    操作步骤

    # 停止 Broker 2
    sudo systemctl stop kafka  # 假设 Broker 2 的 ID 是 2
    
    # 查看 Topic 分区状态,确认 Leader 切换
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092

    https://i-blog.csdnimg.cn/img_convert/b357cad48c7dbf98ce1bcffb56cae0fb.jpeg 说明

    • 停止 Broker 2 后,分区 0 的 Leader 仍然是 Broker 1,分区 1 的 Leader 从 Broker 2 切换到了 Broker 3。
  5. 验证数据持久化

    重启所有 Broker,验证消息是否仍然可以消费。

    操作步骤

    # 重启所有 Broker
    sudo systemctl restart kafka
    
    # 等待 Broker 重新启动(约 30 秒)
    sleep 30
    
    # 再次消费消息,验证数据是否保留
    /opt/kafka/bin/kafka-console-consumer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --from-beginning

    输出示例

    https://i-blog.csdnimg.cn/img_convert/0ee2451637c38b01c7e9f497fae57923.jpeg

  6. 验证分区恢复

    确认停止的 Broker 重新加入集群后,分区状态恢复正常。

    操作步骤

    # 查看 Topic 分区状态
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092

    https://i-blog.csdnimg.cn/img_convert/0706fe4bcb53389f4d4f5f124d64d66d.png


五、关键注意事项

  1. 网络防火墙

    • 开放 ZooKeeper 端口: 2181 (客户端), 2888 (节点间通信), 3888 (选举)
    • 开放 Kafka 端口: 9092 (明文), 999 9(JMX)
  2. 时间同步

    # 所有节点安装 NTP
    sudo yum install ntp -y && sudo systemctl enable --now ntpd
  3. 定期清理日志

    # 手动触发日志删除
    /opt/kafka/bin/kafka-log-dirs.sh \
      --bootstrap-server 192.168.1.101:9092 \
      --describe --topic-list cluster-test

通过以上步骤,您将获得一个高可用、安全的 Kafka 生产级集群,并具备基础的监控和容灾能力。