Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本文介绍 Kafka 的安装部署与使用方法。
介绍
Apache Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka 与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。
使场景化
- 流式 ETL 中间件、流分析
- 实时事件处理
- 实时协作
- 物联网设备之间的通信,一般使用 MQTT
功能
发布订阅
生产者(producer) 生产消息(数据流),并将其发送到指定的主题(topic)中,也可将其发送到topic指定分区(partition)中
消费者(consumer)从指定topic中获取消息,然后来处理消息
流处理(Stream Process) 将输入topic转换数据流到输出topic
连接器(Connector) 将数据从应用程序(源系统)中导入到 kafka,或者从 kafka 导出数据到应用程序
消息模型
队列:同名的消费者组员瓜分消息
发布订阅:广播消息给多个消费者组
基础概念
消息记录(record) 由一个key,一个value和一个时间戳构成
- 消息最终存储在主题下的分区中
- 在生产者中称为
生产者记录(ProducerRecord)
- 在消费者中称为
消费者记录(ConsumerRecord)
- 消息的时效性:Kafka 集群保存所有的消息,直到它们过期,无论消息是否被消费,在一个可配置的时间段内,Kafka 集群保留所有发布的消息
- Kafka 的性能是和数据量无关的常量级的,所以保留大量的数据并不是问题
生产者(producer) 生产者用于发布(publish)消息
消费者(consumer) 消费者用于订阅(subscribe)消息
消费者组(consumer group) 相同group.id的消费者将视为同一个消费者组
- 每个消费者都需要设置一个组 id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
主题(topic) 消息的一种 逻辑分组,用于对消息分门别类,相同主题的消息放在一个 队列 中
分区(partition) 消息的一种 物理分组
- 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的 id,称之为
偏移量(offset),在每个分区中偏移量都是唯一的
偏移量(offset) 分区中的每个消息都有一个唯一 id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动标记一条消息是否已经被成功消费)
代理(broker) 一台 kafka 服务器称之为一个broker
副本(replica) 副本只是一个分区(partition)的备份
领导者(leader) 负责给定分区的所有读取和写入的节点
- 每个分区都有一个服务器充当 Leader,
producer 和 consumer 只跟 leader 交互
追随者(follower) 跟随领导者指令的节点被称为Follower
- 当领导失败时,一个追随者将自动成为新的领导者
- 追随者作为正常消费者,拉取消息并更新其自己的数据存储,
replica 中的一个角色,从 leader 中复制数据
安装
Kafka 是无状态的,使用 ZooKeeper 来维护集群状态。ZooKeeper 用于管理和协调 Kafka 代理,安装部署见Zookeeper 安装使用
kafka 单节点
下载地址:https://kafka.apache.org/downloads 下载 kafka_2.11-1.0.0.tgz 安装包。
解压:
tar -zxvf kafka_2.11-1.0.0.tgz
cd /opt/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件
vim /opt/kafka/config/server.properties
修改其中的:
broker.id=1
log.dir=/data/kafka/logs-1
使用 kafka-server-start.sh 启动 kafka 服务:
bin/kafka-server-start.sh config/server.properties
systemctl 启动配置 /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
docker 部署
docker run -d --name broker apache/kafka:latest
# 使用
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
kafka 集群
Kafka 支持两种模式的集群搭建:
- 在单机上运行多个 broker 实例来实现集群
- 在多台机器上搭建集群
下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。
利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
vim config/server-2.properties 和 config/server-3.properties,修改 :
broker.id=2
listeners = PLAINTEXT://your.host.name:9093
log.dir=/data/kafka/logs-2
和
broker.id=3
listeners = PLAINTEXT://your.host.name:9094
log.dir=/data/kafka/logs-3
启动 Kafka 服务:
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
至此,单机多 broker 实例的集群配置完毕。
分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。
假设三台机器 IP 地址是 : 10.0.0.1, 10.0.0.2, 10.0.0.3
分别配置多个机器上的 Kafka 服务,设置不同的 broker id,zookeeper.connect 设置如下:
vim config/server.properties
里面的 zookeeper.connect
修改为:
broker.id=1/2/3
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181
使用
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看 topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看 topic 详情
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test_topic --describe
列出了 test_topic 的 parition 数量、replica因子 以及每个 partition 的 leader、replica 信息
Kafka Topic 配置解释
| 配置项 |
作用 |
默认值 (常见) |
关键点 |
cleanup.policy |
日志清除策略。决定如何处理旧的日志段。 |
delete |
可选值:delete(按时间和大小删除)或 compact(日志压缩,保留每个 Key 的最新消息)。 |
compression.type |
消息压缩类型。指定生产者发送消息到 Topic 时使用的压缩算法。 |
producer |
可选值:none、gzip、snappy、lz4、zstd 或 producer(使用生产者自身的配置)。影响带宽和 CPU 消耗。 |
delete.retention.ms |
墓碑记录保留时间(仅用于 compact 策略)。定义了日志压缩器在删除一个 Tombstone(Key 为非空、Value 为 Null 的删除标记)之前,必须等待的最短时间。 |
86400000 (24 小时) |
确保消费者有足够的时间看到删除操作。 |
file.delete.delay.ms |
文件删除延迟。一个日志段被标记为删除后,Kafka 等待多久才在文件系统上执行实际删除操作。 |
60000 (1 分钟) |
提供 I/O 缓冲和安全机制。 |
flush.messages |
日志同步阈值(消息数)。日志追加多少条消息后强制将数据同步(fsync)到磁盘。 |
9223372036854775807 (极高) |
在现代 Kafka 版本中,通常推荐禁用(设置为极高值),依赖操作系统和 Broker 级别的同步配置,因为它会严重影响性能。 |
flush.ms |
日志同步阈值(时间)。日志追加时间达到多少毫秒后强制将数据同步(fsync)到磁盘。 |
9223372036854775807 (极高) |
在现代 Kafka 版本中,通常推荐禁用(设置为极高值),原因同上。 |
follower.replication.throttled.replicas |
跟随者复制限流配置。指定在复制数据时要限制速度的跟随者副本列表。格式:<partition_id>:<broker_id>。 |
空 |
用于集群内进行 Broker 维护或移动分区时,避免复制操作占用过多网络带宽。 |
index.interval.bytes |
索引间隔。在日志文件中,每隔多少字节创建一个稀疏索引条目(Offset Index)。 |
4096 (4KB) |
影响查找消息时的性能(查找更快)和索引文件的大小(文件更大)。 |
leader.replication.throttled.replicas |
领导者复制限流配置。指定在复制数据时要限制速度的领导者副本列表。格式:<partition_id>:<broker_id>。 |
空 |
用于集群内进行 Broker 维护或移动分区时,避免复制操作占用过多网络带宽。 |
max.compaction.lag.ms |
最大压缩延迟(仅用于 compact 策略)。如果日志段的最旧消息年龄超过此值,即使没有达到 min.cleanable.dirty.ratio,也会强制进行日志压缩,以防止数据保留时间过长。 |
9223372036854775807 (极高) |
用于保证高龄数据能最终被压缩清理。 |
max.message.bytes |
最大消息大小。Broker 能够接受的单个消息(Record Batch)的最大字节数。 |
1048576 (1MB) |
必须 小于或等于 Broker 级别的 message.max.bytes 配置。 |
message.downconversion.enable |
消息格式降级。是否允许 Broker 在向使用旧版本协议的消费者发送数据时,将消息格式降级到消费者支持的版本。 |
true |
通常保持 true 以保持兼容性。 |
message.format.version |
Topic 消息格式版本。指定 Topic 使用的消息版本格式。 |
Broker 版本 |
影响消息的存储格式和特性(如时间戳、首部)。推荐与 Broker 版本保持一致或设置为最新的 Broker 版本。 |
message.timestamp.difference.max.ms |
时间戳最大差异。当 message.timestamp.type 为 CreateTime 时,允许生产者提供的时间戳与 Broker 当前时间的最大时间差异。 |
9223372036854775807 (极高) |
用于防止生产者发送具有错误或极大偏差的时间戳。 |
message.timestamp.type |
消息时间戳类型。指定消息中的时间戳是基于创建时间还是追加时间。 |
CreateTime |
可选值:CreateTime(生产者设置)或 LogAppendTime(Broker 设置)。 |
min.cleanable.dirty.ratio |
最小可清理脏数据比例(仅用于 compact 策略)。表示一个日志段中,脏数据(即有更新版本的 Key)占总数据量的最小比例。只有当脏数据比例超过此值时,日志压缩线程才会开始清理。 |
0.5 |
越小意味着更频繁的压缩,但 CPU 消耗更高。 |
min.compaction.lag.ms |
最小压缩延迟(仅用于 compact 策略)。消息在被压缩之前,必须等待的最短时间。 |
0 |
用于确保新写入的消息在短时间内不会被清除,给消费者留出时间处理最新数据。 |
min.insync.replicas |
最小同步副本数。当生产者设置 acks 为 all 时,成功写入消息所需的最少同步副本数(ISR)。 |
1 |
这是一个持久性保障。如果当前 ISR 小于此值,生产者将收到错误,写入失败。 |
preallocate |
预分配文件。是否在创建新的日志段文件时预先分配磁盘空间。 |
false |
设置为 true 可以减少文件系统碎片,但可能会稍微增加日志段切换时的延迟。 |
retention.bytes |
日志最大保留大小。Topic 对应的所有分区日志文件总共允许占用的最大字节数。 |
-1 (无限) |
基于大小的保留策略。当超过此值时,最旧的日志段将被删除。 |
retention.ms |
日志最大保留时间。日志段在被删除之前可以保留的最大毫秒数。 |
604800000 (7 天) |
基于时间的保留策略。达到此时间后,最旧的日志段将被删除。 |
segment.bytes |
单个日志段文件大小。单个日志段文件(.log 文件)的最大字节数。 |
1073741824 (1GB) |
影响日志文件的管理和日志清理的粒度。太小会导致文件过多,太大则清理不及时。 |
segment.index.bytes |
单个索引文件大小。单个日志段索引文件(.index 文件)的最大字节数。 |
10485760 (10MB) |
影响单个日志段能包含的最大消息数。 |
segment.jitter.ms |
日志段切换抖动时间。在达到 segment.ms 时间后,实际切换日志段时的随机延迟范围(最大值)。 |
0 |
用于分散日志段切换的 I/O 压力。 |
segment.ms |
日志段切换时间。Kafka 在强制切换到一个新的日志段之前,等待的最大毫秒数。 |
604800000 (7 天) |
与 segment.bytes 共同决定何时切换新的日志段。 |
unclean.leader.election.enable |
非同步副本选举(ULA)。当所有同步副本(ISR)都不可用时,是否允许选择一个不同步的副本作为新的领导者。 |
false |
如果设置为 true,可能导致数据丢失,但可以保证分区在极端情况下仍然可用(可用性优先于持久性)。 |
查看 consumer 列表
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
查看 consumer 详情
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group test --describe
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-1 --describe
其中依次展示 group 名称、消费的 topic 名称、partition id、consumer group 最后一次提交的 offset、最后提交的生产消息 offset、消费 offset 与生产 offset 之间的差值、当前消费 topic-partition 的 group 成员 id(不一定包含 hostname)
consumer 和 producer 的关系
当 Kafka 消费者数量大于分区数量时,会产生以下几个主要影响:
空闲消费者: 由于 Kafka 的每个分区在同一个消费者组内只能被一个消费者消费,当消费者数量超过分区数量时,必然会有部分消费者处于空闲状态,不会接收到任何消息。
资源浪费: 这些空闲的消费者会占用系统资源(如内存、CPU、网络连接等),但却不产生实际价值,造成资源浪费。
重平衡开销: 当消费者组成员发生变化时,Kafka 会触发重平衡(rebalance)过程。消费者数量过多会使重平衡过程更加复杂,可能增加重平衡的时间和系统开销。
连接开销: 每个消费者都需要与 Kafka 集群建立连接,过多的消费者会增加 Kafka broker 的连接负担。
最佳实践是让消费者数量小于或等于分区数量,通常建议消费者数量与分区数量相等,以达到最佳的并行处理效果。如果需要提高消费能力,更适合的方式是增加分区数量,而不是盲目增加消费者数量。
检测
数据上报
bin/kafka-topics.sh --list --zookeeper $zkaddr
监控 topic 数据:
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/common_kafka --topic $topic
127.0.0.1:2181/common_kafka 来自 kafka/config/server.properties
可视化
kafka_exporter
监控
- smartloli/EFAK A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.
编程
Python
pip install kafka-python
import time
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
# Create a Kafka Producer on Localhost with Default Port
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Json data
# producer = KafkaProducer(bootstrap_servers='localhost:9092',
# value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# Create a New Topic to Send Messages
topic = 'Test_Topic_1'
# Retry logic to handle connection issues
max_retries = 3
current_retry = 0
while current_retry < max_retries:
try:
for i in range(5):
message = f'Message {i}'
producer.send(topic, message.encode('utf-8'))
print(f'Sent: {message}')
# If the messages are sent successfully, break the loop
break
except NoBrokersAvailable as e:
print(f"Error: {e}")
print("Retrying in 5 seconds...")
time.sleep(5)
current_retry += 1
# Close the Producer and Close the connections
producer.close()
from kafka import KafkaConsumer
# Create a Kafka Consumer
consumer = KafkaConsumer(
'Test_Topic_1', # Topic Name
bootstrap_servers='localhost:9092', #Host and Port
group_id='your_consumer_group' # Group ID
)
# Continuously listen / poll for New Messages
for message in consumer:
print(f'Received: {message.value.decode("utf-8")}')
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
# 获取当前主题的最新偏移量
print(consumer.position(TopicPartition(topic='test', partition=0)))
# 重置偏移量,从第1个偏移量消费
consumer.seek(TopicPartition(topic='test', partition=0), 1)
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
多线程示例
import threading
import time
from kafka import KafkaConsumer
import json
# Kafka 配置
KAFKA_BROKERS = ['localhost:9092']
TOPIC_NAME = 'your_topic'
GROUP_ID = 'your_consumer_group'
def consume_messages(thread_id):
"""
每个线程的执行函数,创建一个独立的 Kafka 消费者实例
"""
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='earliest', # 从最早的消息开始消费
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print(f"线程 {thread_id} 开始消费...")
try:
for message in consumer:
print(f"线程 {thread_id} 收到消息: "
f"分区: {message.partition}, "
f"偏移量: {message.offset}, "
f"值: {message.value}")
# 这里可以添加处理消息的逻辑
time.sleep(1) # 模拟处理消息的时间
except Exception as e:
print(f"线程 {thread_id} 发生错误: {e}")
finally:
consumer.close()
if __name__ == '__main__':
threads = []
num_consumers = 10
for i in range(num_consumers):
thread = threading.Thread(target=consume_messages, args=(i,))
threads.append(thread)
thread.start()
# 主线程可以做其他事情,或者等待所有线程结束(通常不会)
# for thread in threads:
# thread.join()
print("所有消费者线程已启动。")
多进程示例
import multiprocessing
import time
from kafka import KafkaConsumer
import json
# Kafka 配置
KAFKA_BROKERS = ['localhost:9092']
TOPIC_NAME = 'your_topic'
GROUP_ID = 'your_consumer_group'
def consume_messages(process_id):
"""
每个进程的执行函数,创建一个独立的 Kafka 消费者实例
"""
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print(f"进程 {process_id} 开始消费...")
try:
for message in consumer:
print(f"进程 {process_id} 收到消息: "
f"分区: {message.partition}, "
f"偏移量: {message.offset}, "
f"值: {message.value}")
# 这里可以添加处理消息的逻辑
time.sleep(1) # 模拟处理消息的时间
except Exception as e:
print(f"进程 {process_id} 发生错误: {e}")
finally:
consumer.close()
if __name__ == '__main__':
processes = []
num_consumers = 10
for i in range(num_consumers):
process = multiprocessing.Process(target=consume_messages, args=(i,))
processes.append(process)
process.start()
# 等待所有子进程完成
for process in processes:
process.join()
print("所有消费者进程已启动并运行。")
F&Q
启动失败问题
错误日志:
ARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/data/...-0/00000000000000108309.index) has non-zero size but the last offset is 108309 which is no larger than the base offset 108309.}. deleting /data/...-0/00000000000000108309.timeindex, /data/...-0/00000000000000108309.index and rebuilding index
解决方法:收到删除对应的 index,后重启服务。
kafka topic 文件 .log.deleted 没有删除怎么处理
- Kafka 的日志清理任务可能因为高负载或内部错误而延迟执行,一般重启后,可立即删除。