Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本文介绍 Kafka 的安装部署与使用方法。
介绍
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
- 它被设计为一个分布式系统,易于向外扩展;
- 它同时为发布和订阅提供高吞吐量;
- 它支持多订阅者,当失败时能自动平衡消费者;
- 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
功能
发布订阅
生产者(producer)
生产消息(数据流),并将其发送到指定的主题(topic)
中,也可将其发送到topic
指定分区(partition)
中
消费者(consumer)
从指定topic
中获取消息,然后来处理消息
流处理(Stream Process)
将输入topic
转换数据流到输出topic
连接器(Connector)
将数据从应用程序(源系统)中导入到kafka,或者从kafka导出数据到应用程序
消息模型
队列
:同名的消费者组员瓜分消息
发布订阅
:广播消息给多个消费者组
基础概念
消息记录(record)
由一个key
,一个value
和一个时间戳
构成
- 消息最终存储在主题下的分区中
- 在生产者中称为
生产者记录(ProducerRecord)
- 在消费者中称为
消费者记录(ConsumerRecord)
- 消息的时效性:Kafka集群保存所有的消息,直到它们过期,无论消息是否被消费,在一个可配置的时间段内,Kafka集群保留所有发布的消息
- Kafka的性能是和数据量无关的常量级的,所以保留大量的数据并不是问题
生产者(producer)
生产者用于发布(publish)
消息
消费者(consumer)
消费者用于订阅(subscribe)
消息
消费者组(consumer group)
相同group.id
的消费者将视为同一个消费者组
- 每个消费者都需要设置一个组id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
主题(topic)
消息的一种逻辑分组
,用于对消息分门别类,相同主题的消息放在一个队列
中
分区(partition)
消息的一种物理分组
- 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的id,称之为
偏移量(offset)
,在每个分区中偏移量都是唯一的
偏移量(offset)
分区中的每个消息都有一个唯一id,称之为偏移量
,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动标记一条消息是否已经被成功消费)
代理(broker)
一台kafka服务器称之为一个broker
副本(replica)
副本只是一个分区(partition)
的备份
领导者(leader)
负责给定分区的所有读取和写入的节点
- 每个分区都有一个服务器充当Leader,
producer
和 consumer
只跟 leader
交互
追随者(follower)
跟随领导者指令的节点被称为Follower
- 当领导失败时,一个追随者将自动成为新的领导者
- 追随者作为正常消费者,拉取消息并更新其自己的数据存储,
replica
中的一个角色,从 leader
中复制数据
安装
Kafka
是无状态的,使用 ZooKeeper
来维护集群状态。ZooKeeper
用于管理和协调 Kafka
代理,安装部署见Zookeeper 安装使用
kafka 单节点
下载地址:https://kafka.apache.org/downloads 下载 kafka_2.11-1.0.0.tgz 安装包。
解压:
tar -zxvf kafka_2.11-1.0.0.tgz
cd /usr/local/kafka_2.11-1.0.0/
修改 kafka-server 的配置文件
vim /usr/local/kafka/config/server.properties
修改其中的:
broker.id=1
log.dir=/data/kafka/logs-1
使用 kafka-server-start.sh 启动 kafka 服务:
bin/kafka-server-start.sh config/server.properties
kafka 集群
Kafka 支持两种模式的集群搭建:
- 在单机上运行多个 broker 实例来实现集群
- 在多台机器上搭建集群
下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。
利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
vim config/server-2.properties
和 config/server-3.properties
,修改 :
broker.id=2
listeners = PLAINTEXT://your.host.name:9093
log.dir=/data/kafka/logs-2
和
broker.id=3
listeners = PLAINTEXT://your.host.name:9094
log.dir=/data/kafka/logs-3
启动Kafka服务:
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
至此,单机多broker实例的集群配置完毕。
分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。
假设三台机器 IP 地址是 : 10.0.0.1, 10.0.0.2, 10.0.0.3
分别配置多个机器上的 Kafka 服务,设置不同的 broker id
,zookeeper.connect
设置如下:
vim config/server.properties
里面的 zookeeper.connect
修改为:
broker.id=1/2/3
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181
使用
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看 topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看 topic 详情
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test_topic --describe
列出了 test_topic
的 parition
数量、replica因子
以及每个 partition
的 leader
、replica
信息
查看 consumer 列表
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
查看 consumer 详情
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group test --describe
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-1 --describe
其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)
检测
数据上报
bin/kafka-topics.sh --list --zookeeper $zkaddr
监控 topic 数据:
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/common_kafka --topic $topic
127.0.0.1:2181/common_kafka
来自 kafka/config/server.properties
可视化
F&Q
启动失败问题
错误日志:
ARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/data/...-0/00000000000000108309.index) has non-zero size but the last offset is 108309 which is no larger than the base offset 108309.}. deleting /data/...-0/00000000000000108309.timeindex, /data/...-0/00000000000000108309.index and rebuilding index
解决方法:收到删除对应的index,后重启服务。