Kafka 安装使用

发布时间: 更新时间: 总字数:2395 阅读时间:5m 作者: IP上海 分享 网址

Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本文介绍 Kafka 的安装部署与使用方法。

介绍

Apache Kafka 是分布式发布-订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

Apache Kafka 与传统消息系统相比,有以下不同:

  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;
  • 它将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。

使场景化

  • 流式 ETL 中间件、流分析
  • 实时事件处理
  • 实时协作
  • 物联网设备之间的通信,一般使用 MQTT

功能

  • 发布订阅
    • 生产者(producer) 生产消息(数据流),并将其发送到指定的主题(topic)中,也可将其发送到topic指定分区(partition)
    • 消费者(consumer)从指定topic中获取消息,然后来处理消息
  • 流处理(Stream Process)输入topic转换数据流到输出topic
  • 连接器(Connector) 将数据从应用程序(源系统)中导入到 kafka,或者从 kafka 导出数据到应用程序

消息模型

  • 队列:同名的消费者组员瓜分消息
  • 发布订阅:广播消息给多个消费者组

基础概念

  • 消息记录(record) 由一个key,一个value和一个时间戳构成
    • 消息最终存储在主题下的分区中
      • 在生产者中称为生产者记录(ProducerRecord)
      • 在消费者中称为消费者记录(ConsumerRecord)
    • 消息的时效性:Kafka 集群保存所有的消息,直到它们过期,无论消息是否被消费,在一个可配置的时间段内,Kafka 集群保留所有发布的消息
    • Kafka 的性能是和数据量无关的常量级的,所以保留大量的数据并不是问题
  • 生产者(producer) 生产者用于发布(publish)消息
  • 消费者(consumer) 消费者用于订阅(subscribe)消息
  • 消费者组(consumer group) 相同group.id的消费者将视为同一个消费者组
    • 每个消费者都需要设置一个组 id,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费
  • 主题(topic) 消息的一种 逻辑分组,用于对消息分门别类,相同主题的消息放在一个 队列
  • 分区(partition) 消息的一种 物理分组
    • 一个主题被拆成多个分区,每一个分区就是一个顺序的、不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的 id,称之为偏移量(offset),在每个分区中偏移量都是唯一的
      • 偏移量(offset) 分区中的每个消息都有一个唯一 id,称之为偏移量,它代表已经消费的位置。可以自动或者手动提交偏移量(即自动或者手动标记一条消息是否已经被成功消费)
  • 代理(broker) 一台 kafka 服务器称之为一个broker
  • 副本(replica) 副本只是一个分区(partition)的备份
    • 副本从不读取或写入数据,用于防止数据丢失
  • 领导者(leader) 负责给定分区的所有读取和写入的节点
    • 每个分区都有一个服务器充当 Leader,producerconsumer 只跟 leader 交互
  • 追随者(follower) 跟随领导者指令的节点被称为Follower
    • 当领导失败时,一个追随者将自动成为新的领导者
    • 追随者作为正常消费者,拉取消息并更新其自己的数据存储,replica 中的一个角色,从 leader 中复制数据

安装

Kafka 是无状态的,使用 ZooKeeper 来维护集群状态。ZooKeeper 用于管理和协调 Kafka 代理,安装部署见Zookeeper 安装使用

kafka 单节点

下载地址:https://kafka.apache.org/downloads 下载 kafka_2.11-1.0.0.tgz 安装包。

解压:

tar -zxvf kafka_2.11-1.0.0.tgz
cd /opt/kafka_2.11-1.0.0/

修改 kafka-server 的配置文件

vim /opt/kafka/config/server.properties

修改其中的:

broker.id=1
log.dir=/data/kafka/logs-1

使用 kafka-server-start.sh 启动 kafka 服务:

bin/kafka-server-start.sh config/server.properties

systemctl 启动配置 /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]

WantedBy=multi-user.target

docker 部署

docker run -d --name broker apache/kafka:latest

# 使用
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

kafka 集群

Kafka 支持两种模式的集群搭建:

  • 在单机上运行多个 broker 实例来实现集群
  • 在多台机器上搭建集群

下面介绍下如何实现单机多 broker 实例集群,其实很简单,只需要如下配置即可。

  • 单机多 broker 集群配置

利用单节点部署多个 broker。 不同的 broker 设置不同的 id,监听端口及日志目录。 例如:

cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties

vim config/server-2.propertiesconfig/server-3.properties,修改 :

broker.id=2
listeners = PLAINTEXT://your.host.name:9093
log.dir=/data/kafka/logs-2

broker.id=3
listeners = PLAINTEXT://your.host.name:9094
log.dir=/data/kafka/logs-3

启动 Kafka 服务:

bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &

至此,单机多 broker 实例的集群配置完毕。

  • 多机多 broker 集群配置

分别在多个节点按上述方式安装 Kafka,配置启动多个 Zookeeper 实例。

假设三台机器 IP 地址是 : 10.0.0.1, 10.0.0.2, 10.0.0.3

分别配置多个机器上的 Kafka 服务,设置不同的 broker idzookeeper.connect 设置如下:

vim config/server.properties

里面的 zookeeper.connect

修改为:

broker.id=1/2/3
zookeeper.connect=10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181

使用

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

查看 topic 列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看 topic 详情

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test_topic --describe

列出了 test_topicparition 数量、replica因子 以及每个 partitionleaderreplica 信息

查看 consumer 列表

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

查看 consumer 详情

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group test --describe
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-1 --describe

其中依次展示 group 名称、消费的 topic 名称、partition id、consumer group 最后一次提交的 offset、最后提交的生产消息 offset、消费 offset 与生产 offset 之间的差值、当前消费 topic-partition 的 group 成员 id(不一定包含 hostname)

检测

数据上报

bin/kafka-topics.sh --list --zookeeper $zkaddr

监控 topic 数据:

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/common_kafka --topic $topic

127.0.0.1:2181/common_kafka 来自 kafka/config/server.properties

可视化

kafka_exporter

编程

Python

  • 安装
pip install kafka-python
  • 生成者
kafka producer ...
import time

from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable


# Create a Kafka Producer on Localhost with Default Port
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Json data
# producer = KafkaProducer(bootstrap_servers='localhost:9092',
#                          value_serializer=lambda m: json.dumps(m).encode('utf-8'))


# Create a New Topic to Send Messages
topic = 'Test_Topic_1'

# Retry logic to handle connection issues
max_retries = 3
current_retry = 0
while current_retry < max_retries:
    try:
        for i in range(5):
            message = f'Message {i}'
            producer.send(topic, message.encode('utf-8'))
            print(f'Sent: {message}')
        # If the messages are sent successfully, break the loop
        break

    except NoBrokersAvailable as e:
        print(f"Error: {e}")
        print("Retrying in 5 seconds...")
        time.sleep(5)
        current_retry += 1

# Close the Producer and Close the connections
producer.close()
  • 消费者
kafka consumer ...
from kafka import KafkaConsumer

# Create a Kafka Consumer
consumer = KafkaConsumer(
    'Test_Topic_1', # Topic Name
    bootstrap_servers='localhost:9092', #Host and Port
    group_id='your_consumer_group' # Group ID
)

# Continuously listen / poll for New Messages
for message in consumer:
    print(f'Received: {message.value.decode("utf-8")}')

F&Q

启动失败问题

错误日志:

ARN Found a corrupted index file due to requirement failed: Corrupt index found, index file (/data/...-0/00000000000000108309.index) has non-zero size but the last offset is 108309 which is no larger than the base offset 108309.}. deleting /data/...-0/00000000000000108309.timeindex, /data/...-0/00000000000000108309.index and rebuilding index

解决方法:收到删除对应的 index,后重启服务。

参考

  1. 官网
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数