Kafka 安装使用

发布时间: 更新时间: 总字数:5554 阅读时间:12m 作者: IP上海 分享 网址

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本文介绍 Kafka 的安装部署与使用方法。

介绍

Apache Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka 与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。

使场景化

  • 流式 ETL 中间件、流分析
  • 实时事件处理
  • 实时协作
  • 物联网设备之间的通信,一般使用 MQTT

功能

  • 发布订阅
    • 生产者(producer) 生产消息(数据流),并将其发送到指定的主题(topic)中,也可将其发送到topic指定分区(partition)
    • 消费者(consumer)从指定topic中获取消息,然后来处理消息
  • 流处理(Stream Process)输入topic转换数据流到输出topic
  • 连接器(Connector) 将数据从应用程序(源系统)中导入到 kafka,或者从 kafka 导出数据到应用程序

消息模型

  • 队列:同名的消费者组员瓜分消息
  • 发布订阅:广播消息给多个消费者组

基础概念

  • 消息记录(record) 由一个key,一个value和一个时间戳构成
    • 消息最终存储在主题下的分区中
      • 在生产者中称为生产者记录(ProducerRecord)
      • 在消费者中称为消费者记录(ConsumerRecord)
    • 消息的时效性:Kafka 集群保存所有的消息,直到它们过期,无论消息是否被消费,在一个可配置的时间段内,Kafka 集群保留所有发布的消息
    • Kafka 的性能是和数据量无关的常量级的,所以保留大量的数据并不是问题
  • 生产者(producer) 生产者用于发布(publish)消息
  • 消费者(consumer) 消费者用于订阅(subscribe)消息
  • 消费者组(consumer group) 相同group.id的消费者将视为同一个消费者组
    • 每个消费者都需要设置一个组 id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
  • 主题(topic) 消息的一种 逻辑分组,用于对消息分门别类,相同主题的消息放在一个 队列
  • 分区(partition) 消息的一种 物理分组
    • 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的 id,称之为偏移量(offset),在每个分区中偏移量都是唯一的
      • 偏移量(offset) 分区中的每个消息都有一个唯一 id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动标记一条消息是否已经被成功消费)
  • 代理(broker) 一台 kafka 服务器称之为一个broker
  • 副本(replica) 副本只是一个分区(partition)的备份
    • 副本从不读取或写入数据,用于防止数据丢失
  • 领导者(leader) 负责给定分区的所有读取和写入的节点
    • 每个分区都有一个服务器充当 Leader,producerconsumer 只跟 leader 交互
  • 追随者(follower) 跟随领导者指令的节点被称为Follower
    • 当领导失败时,一个追随者将自动成为新的领导者
    • 追随者作为正常消费者,拉取消息并更新其自己的数据存储,replica 中的一个角色,从 leader 中复制数据

安装

Kafka 是无状态的,使用 ZooKeeper 来维护集群状态。ZooKeeper 用于管理和协调 Kafka 代理,安装部署见Zookeeper 安装使用

kafka 单节点

下载地址:https://kafka.apache.org/downloads 下载 kafka_2.11-1.0.0.tgz 安装包。

解压:

tar -zxvf kafka_2.11-1.0.0.tgz
cd /opt/kafka_2.11-1.0.0/

修改 kafka-server 的配置文件

vim /opt/kafka/config/server.properties

修改其中的:

broker.id=1
log.dir=/data/kafka/logs-1

使用 kafka-server-start.sh 启动 kafka 服务:

bin/kafka-server-start.sh config/server.properties

systemctl 启动配置 /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]

WantedBy=multi-user.target

docker 部署

docker run -d --name broker apache/kafka:latest

# 使用
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

kafka 集群

Kafka 支持两种模式的集群搭建:

  • 在单机上运行多个 broker 实例来实现集群
  • 在多台机器上搭建集群

下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。

  • 单机多 broker 集群配置

利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:

cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties

vim config/server-2.propertiesconfig/server-3.properties,修改 :

broker.id=2
listeners = PLAINTEXT://your.host.name:9093
log.dir=/data/kafka/logs-2

broker.id=3
listeners = PLAINTEXT://your.host.name:9094
log.dir=/data/kafka/logs-3

启动 Kafka 服务:

bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

至此,单机多 broker 实例的集群配置完毕。

  • 多机多 broker 集群配置

分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。

假设三台机器 IP 地址是 : 10.0.0.1, 10.0.0.2, 10.0.0.3

分别配置多个机器上的 Kafka 服务,设置不同的 broker idzookeeper.connect 设置如下:

vim config/server.properties

里面的 zookeeper.connect

修改为:

broker.id=1/2/3
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181

使用

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

查看 topic 列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看 topic 详情

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test_topic --describe

列出了 test_topicparition 数量、replica因子 以及每个 partitionleaderreplica 信息

Kafka Topic 配置解释

配置项 作用 默认值 (常见) 关键点
cleanup.policy 日志清除策略。决定如何处理旧的日志段。 delete 可选值:delete(按时间和大小删除)或 compact(日志压缩,保留每个 Key 的最新消息)。
compression.type 消息压缩类型。指定生产者发送消息到 Topic 时使用的压缩算法。 producer 可选值:nonegzipsnappylz4zstdproducer(使用生产者自身的配置)。影响带宽和 CPU 消耗。
delete.retention.ms 墓碑记录保留时间(仅用于 compact 策略)。定义了日志压缩器在删除一个 Tombstone(Key 为非空、Value 为 Null 的删除标记)之前,必须等待的最短时间。 86400000 (24 小时) 确保消费者有足够的时间看到删除操作。
file.delete.delay.ms 文件删除延迟。一个日志段被标记为删除后,Kafka 等待多久才在文件系统上执行实际删除操作。 60000 (1 分钟) 提供 I/O 缓冲和安全机制。
flush.messages 日志同步阈值(消息数)。日志追加多少条消息后强制将数据同步(fsync)到磁盘。 9223372036854775807 (极高) 在现代 Kafka 版本中,通常推荐禁用(设置为极高值),依赖操作系统和 Broker 级别的同步配置,因为它会严重影响性能。
flush.ms 日志同步阈值(时间)。日志追加时间达到多少毫秒后强制将数据同步(fsync)到磁盘。 9223372036854775807 (极高) 在现代 Kafka 版本中,通常推荐禁用(设置为极高值),原因同上。
follower.replication.throttled.replicas 跟随者复制限流配置。指定在复制数据时要限制速度的跟随者副本列表。格式:<partition_id>:<broker_id> 用于集群内进行 Broker 维护或移动分区时,避免复制操作占用过多网络带宽。
index.interval.bytes 索引间隔。在日志文件中,每隔多少字节创建一个稀疏索引条目(Offset Index)。 4096 (4KB) 影响查找消息时的性能(查找更快)和索引文件的大小(文件更大)。
leader.replication.throttled.replicas 领导者复制限流配置。指定在复制数据时要限制速度的领导者副本列表。格式:<partition_id>:<broker_id> 用于集群内进行 Broker 维护或移动分区时,避免复制操作占用过多网络带宽。
max.compaction.lag.ms 最大压缩延迟(仅用于 compact 策略)。如果日志段的最旧消息年龄超过此值,即使没有达到 min.cleanable.dirty.ratio,也会强制进行日志压缩,以防止数据保留时间过长。 9223372036854775807 (极高) 用于保证高龄数据能最终被压缩清理。
max.message.bytes 最大消息大小。Broker 能够接受的单个消息(Record Batch)的最大字节数。 1048576 (1MB) 必须 小于或等于 Broker 级别的 message.max.bytes 配置。
message.downconversion.enable 消息格式降级。是否允许 Broker 在向使用旧版本协议的消费者发送数据时,将消息格式降级到消费者支持的版本。 true 通常保持 true 以保持兼容性。
message.format.version Topic 消息格式版本。指定 Topic 使用的消息版本格式。 Broker 版本 影响消息的存储格式和特性(如时间戳、首部)。推荐与 Broker 版本保持一致或设置为最新的 Broker 版本。
message.timestamp.difference.max.ms 时间戳最大差异。当 message.timestamp.typeCreateTime 时,允许生产者提供的时间戳与 Broker 当前时间的最大时间差异 9223372036854775807 (极高) 用于防止生产者发送具有错误或极大偏差的时间戳。
message.timestamp.type 消息时间戳类型。指定消息中的时间戳是基于创建时间还是追加时间 CreateTime 可选值:CreateTime(生产者设置)或 LogAppendTime(Broker 设置)。
min.cleanable.dirty.ratio 最小可清理脏数据比例(仅用于 compact 策略)。表示一个日志段中,脏数据(即有更新版本的 Key)占总数据量的最小比例。只有当脏数据比例超过此值时,日志压缩线程才会开始清理。 0.5 越小意味着更频繁的压缩,但 CPU 消耗更高。
min.compaction.lag.ms 最小压缩延迟(仅用于 compact 策略)。消息在被压缩之前,必须等待的最短时间。 0 用于确保新写入的消息在短时间内不会被清除,给消费者留出时间处理最新数据。
min.insync.replicas 最小同步副本数。当生产者设置 acksall 时,成功写入消息所需的最少同步副本数(ISR)。 1 这是一个持久性保障。如果当前 ISR 小于此值,生产者将收到错误,写入失败。
preallocate 预分配文件。是否在创建新的日志段文件时预先分配磁盘空间。 false 设置为 true 可以减少文件系统碎片,但可能会稍微增加日志段切换时的延迟。
retention.bytes 日志最大保留大小。Topic 对应的所有分区日志文件总共允许占用的最大字节数 -1 (无限) 基于大小的保留策略。当超过此值时,最旧的日志段将被删除。
retention.ms 日志最大保留时间。日志段在被删除之前可以保留的最大毫秒数 604800000 (7 天) 基于时间的保留策略。达到此时间后,最旧的日志段将被删除。
segment.bytes 单个日志段文件大小。单个日志段文件(.log 文件)的最大字节数。 1073741824 (1GB) 影响日志文件的管理和日志清理的粒度。太小会导致文件过多,太大则清理不及时。
segment.index.bytes 单个索引文件大小。单个日志段索引文件(.index 文件)的最大字节数。 10485760 (10MB) 影响单个日志段能包含的最大消息数。
segment.jitter.ms 日志段切换抖动时间。在达到 segment.ms 时间后,实际切换日志段时的随机延迟范围(最大值)。 0 用于分散日志段切换的 I/O 压力。
segment.ms 日志段切换时间。Kafka 在强制切换到一个新的日志段之前,等待的最大毫秒数 604800000 (7 天) segment.bytes 共同决定何时切换新的日志段。
unclean.leader.election.enable 非同步副本选举(ULA)。当所有同步副本(ISR)都不可用时,是否允许选择一个不同步的副本作为新的领导者。 false 如果设置为 true可能导致数据丢失,但可以保证分区在极端情况下仍然可用(可用性优先于持久性)。

查看 consumer 列表

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

查看 consumer 详情

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group test --describe
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-1 --describe

其中依次展示 group 名称、消费的 topic 名称、partition id、consumer group 最后一次提交的 offset、最后提交的生产消息 offset、消费 offset 与生产 offset 之间的差值、当前消费 topic-partition 的 group 成员 id(不一定包含 hostname)

consumer 和 producer 的关系

当 Kafka 消费者数量大于分区数量时,会产生以下几个主要影响:

  1. 空闲消费者: 由于 Kafka 的每个分区在同一个消费者组内只能被一个消费者消费,当消费者数量超过分区数量时,必然会有部分消费者处于空闲状态,不会接收到任何消息。
  2. 资源浪费: 这些空闲的消费者会占用系统资源(如内存、CPU、网络连接等),但却不产生实际价值,造成资源浪费。
  3. 重平衡开销: 当消费者组成员发生变化时,Kafka 会触发重平衡(rebalance)过程。消费者数量过多会使重平衡过程更加复杂,可能增加重平衡的时间和系统开销。
  4. 连接开销: 每个消费者都需要与 Kafka 集群建立连接,过多的消费者会增加 Kafka broker 的连接负担。

最佳实践是让消费者数量小于或等于分区数量,通常建议消费者数量与分区数量相等,以达到最佳的并行处理效果。如果需要提高消费能力,更适合的方式是增加分区数量,而不是盲目增加消费者数量。

检测

数据上报

bin/kafka-topics.sh --list --zookeeper $zkaddr

监控 topic 数据:

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/common_kafka --topic $topic

127.0.0.1:2181/common_kafka 来自 kafka/config/server.properties

可视化

kafka_exporter

监控

  • smartloli/EFAK A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.

编程

Python

  • 安装
pip install kafka-python
  • 生成者
kafka producer ...
import time

from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable


# Create a Kafka Producer on Localhost with Default Port
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Json data
# producer = KafkaProducer(bootstrap_servers='localhost:9092',
#                          value_serializer=lambda m: json.dumps(m).encode('utf-8'))


# Create a New Topic to Send Messages
topic = 'Test_Topic_1'

# Retry logic to handle connection issues
max_retries = 3
current_retry = 0
while current_retry < max_retries:
    try:
        for i in range(5):
            message = f'Message {i}'
            producer.send(topic, message.encode('utf-8'))
            print(f'Sent: {message}')
        # If the messages are sent successfully, break the loop
        break

    except NoBrokersAvailable as e:
        print(f"Error: {e}")
        print("Retrying in 5 seconds...")
        time.sleep(5)
        current_retry += 1

# Close the Producer and Close the connections
producer.close()
  • 消费者
kafka consumer ...
from kafka import KafkaConsumer

# Create a Kafka Consumer
consumer = KafkaConsumer(
    'Test_Topic_1', # Topic Name
    bootstrap_servers='localhost:9092', #Host and Port
    group_id='your_consumer_group' # Group ID
)

# Continuously listen / poll for New Messages
for message in consumer:
    print(f'Received: {message.value.decode("utf-8")}')
  • 手动设置偏移量
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
# 获取当前主题的最新偏移量
print(consumer.position(TopicPartition(topic='test', partition=0)))
# 重置偏移量,从第1个偏移量消费
consumer.seek(TopicPartition(topic='test', partition=0), 1)
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

多线程示例

  • consumer.py
import threading
import time
from kafka import KafkaConsumer
import json

# Kafka 配置
KAFKA_BROKERS = ['localhost:9092']
TOPIC_NAME = 'your_topic'
GROUP_ID = 'your_consumer_group'

def consume_messages(thread_id):
    """
    每个线程的执行函数,创建一个独立的 Kafka 消费者实例
    """
    consumer = KafkaConsumer(
        TOPIC_NAME,
        group_id=GROUP_ID,
        bootstrap_servers=KAFKA_BROKERS,
        auto_offset_reset='earliest',  # 从最早的消息开始消费
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    print(f"线程 {thread_id} 开始消费...")
    try:
        for message in consumer:
            print(f"线程 {thread_id} 收到消息: "
                  f"分区: {message.partition}, "
                  f"偏移量: {message.offset}, "
                  f"值: {message.value}")
            # 这里可以添加处理消息的逻辑
            time.sleep(1) # 模拟处理消息的时间
    except Exception as e:
        print(f"线程 {thread_id} 发生错误: {e}")
    finally:
        consumer.close()

if __name__ == '__main__':
    threads = []
    num_consumers = 10

    for i in range(num_consumers):
        thread = threading.Thread(target=consume_messages, args=(i,))
        threads.append(thread)
        thread.start()

    # 主线程可以做其他事情,或者等待所有线程结束(通常不会)
    # for thread in threads:
    #     thread.join()

    print("所有消费者线程已启动。")

多进程示例

import multiprocessing
import time
from kafka import KafkaConsumer
import json

# Kafka 配置
KAFKA_BROKERS = ['localhost:9092']
TOPIC_NAME = 'your_topic'
GROUP_ID = 'your_consumer_group'

def consume_messages(process_id):
    """
    每个进程的执行函数,创建一个独立的 Kafka 消费者实例
    """
    consumer = KafkaConsumer(
        TOPIC_NAME,
        group_id=GROUP_ID,
        bootstrap_servers=KAFKA_BROKERS,
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    print(f"进程 {process_id} 开始消费...")
    try:
        for message in consumer:
            print(f"进程 {process_id} 收到消息: "
                  f"分区: {message.partition}, "
                  f"偏移量: {message.offset}, "
                  f"值: {message.value}")
            # 这里可以添加处理消息的逻辑
            time.sleep(1) # 模拟处理消息的时间
    except Exception as e:
        print(f"进程 {process_id} 发生错误: {e}")
    finally:
        consumer.close()

if __name__ == '__main__':
    processes = []
    num_consumers = 10

    for i in range(num_consumers):
        process = multiprocessing.Process(target=consume_messages, args=(i,))
        processes.append(process)
        process.start()

    # 等待所有子进程完成
    for process in processes:
        process.join()

    print("所有消费者进程已启动并运行。")

F&Q

启动失败问题

错误日志:

ARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/data/...-0/00000000000000108309.index) has non-zero size but the last offset is 108309 which is no larger than the base offset 108309.}. deleting /data/...-0/00000000000000108309.timeindex, /data/...-0/00000000000000108309.index and rebuilding index

解决方法:收到删除对应的 index,后重启服务。

kafka topic 文件 .log.deleted 没有删除怎么处理

  • Kafka 的日志清理任务可能因为高负载或内部错误而延迟执行,一般重启后,可立即删除。

参考

  1. 官网
本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics