RabbitMQ是重要的消息队列组件,OpenStack采用RabbitMQ做消息交互的重要组件之一。
常见 MQ 对比
| ActiveMQ | RabbitMQ | RocketMQ | Kafka | ZeroMQ |
---|
单机吞吐量 | 比RabbitMQ低 | 2.6w/s(消息做持久化) | 11.6w/s | 17.3w/s | 29w/s |
开发语言 | Java | Erlang | Java | Scala/Java | C |
主要维护者 | Apache | Mozilla/Spring | Alibaba | Apache | iMatix,创始人已去世 |
成熟度 | 成熟 | 成熟 | 开源版本不够成熟 | 比较成熟 | 只有C、PHP等版本成熟 |
订阅形式 | 点对点(p2p) 、广播(发布-订阅) | 提供了4种:direct, topic, Headers和fanout。fanout就是广播模式 | 基于 topic/messageTag 以及按照消息类型、属性进行正则匹配的发布订阅模式 | 基于topic以及按照topic进行正则匹配的发布订阅模式 | 点对点(p2p) |
持久化 | 支持少量堆积 | 支持少量堆积 | 支持大量堆积 | 支持大量堆积 | 不支持 |
顺序消息 | 不支持 | 不支持 | 支持 | 支持 | 不支持 |
性能稳定性 | 好 | 好 | 一般 | 较差 | 很好 |
集群方式 | 支持简单集群模式,比如主-备 ,对高级集群模式支持不好。 | 支持简单集群,复制 模式,对高级集群模式支持不好。 | 常用多对Master-Slave 模式,开源版本需手动切换Slave变成Master | 天然的‘Leader-Slave`无状态集群,每台服务器既是Master也是Slave | 不支持 |
管理界面 | 一般 | 较好 | 一般 | 无 | 无 |
安装rabbitmq
sudo yum install rabbitmq-server -y
启动
sudo service rabbitmq-server start
常用端口
- 4369 (epmd), 25672 (Erlang distribution)
- 5672, 5671 (AMQP 0-9-1 without and with TLS)
- 15672 (if management plugin is enabled)
- 61613, 61614 (if STOMP is enabled)
- 1883, 8883 (if MQTT is enabled)
概念
RabbitMQ 实现了 AMQP 0-9-1[Advanced Message Queuing Protocol],
其中重要的概念是 交换机[Exchange] 和 队列[Queue],Exchange 和 Queue 通过 绑定[Bind] 操作进行绑定。
生产者[Producer] 不需要关注 消息[Message] 实际被 路由[Routing] 到哪个队列,而 消费者[Consumer] 也不需要关注 Message 是从哪个 Exchange 接收的。
Exchange 分为 直连[direct]交换机、扇形[fanout]交换机、主题[topic]交换机、头[headers]交换机。
direct exchange 与 queue 绑定时需要指定一个 routing_key,消息被发送到 direct exchange 时也需要指定一个 routing_key,消息会被分发到具有相同 routing_key 绑定的 queue 中;
fanout exchange 是一个广播型的交换机,发送到 fanout exchange 的消息被分发到与之绑定的所有 queue 中;
topic exchange 是更复杂的 direct exchange,它的 routing_key 必须是以 . 分隔的几个字符串,一般是以多个维度分隔 (例如日志系统中以 “应用.级别” [shop.info]来制定),topic exchange 与 queue 绑定时可以指定通配的 routing_key,* 表示一个单词,# 表示任意数量的单词;
headers exchange 不通过 routing_key 来建立与 queue 的绑定,而是使用消息的属性来进行分发。
访问用户
guest用户被默认创建,默认在localhost可以用guest/guest来访问。
rabbitmqctl
命令可以CRUD用户,见命令help
rabbitmqctl set_user_tags user_admin administrator
# rabbitmqctl set_user_tags user_admin monitoring policymaker
rabbitmqctl set_permissions -p / user_admin ".*" ".*" ".*"
vhosts=`rabbitmqctl list_vhosts | grep -v "Listing vhosts" | grep -v "/"`
for vhost in $vhosts; do
rabbitmqctl set_permissions -p $vhost user_admin ".*" ".*" ".*"
done
角色
RabbitMQ的用户角色分类:
none、management、policymaker、monitoring、administrator
none
不能访问 management plugin
management
用户可以通过AMQP做的任何事外加:
- 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中的queues, exchanges 和 bindings
- 查看和关闭自己的channels 和 connections
- 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动
policymaker
management可以做的任何事外加:
- 查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加:
- 列出所有virtual hosts,包括他们不能登录的virtual hosts
- 查看其他用户的connections和channels
- 查看节点级别的数据如clustering和memory使用情况
- 查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加:
- 创建和删除virtual hosts
- 查看、创建和删除users
- 查看创建和删除permissions
- 关闭其他用户的connections
日志
位于/var/log/rabbitmq
ulimit设置
We recommend allowing for at least 65536 file descriptors for user rabbitmq in production environments. 4096 should be sufficient for most development workloads
验证ulimit
cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits 或者 rabbitmqctl status
其中RABBITMQ_BEAM_PROCESS_PID的值可以从rabbitmqctl status中获取
注意事项
- pika是rabbitmq官方推荐的python客户端
- queue没有边界,是一个无限大的缓存。
- producer的代码逻辑:
- 先获得connection对象
- 然后就获得channel对象
- 然后queue_declare创建queue
- basic_publish,参数routing_key指定了queue的名字,exchange指定了exchange的名字
- connection.close()会flush缓存,然后关闭连接。
- consumer的代码逻辑:
- 先获得connection对象
- 然后就获得channel对象
- 然后queue_declare创建queue(是幂等的,没事)
- 定义callback函数
- basic_consume,参数queue指定queue的名字,callback传入当作回调,no_ack指定是否要ack
- start_consuming,开始无限循环的等待。
- 如果向一个不存在的queue发送message,会被丢弃。所以需要创建queue(queue_declare)
- 创建queue是幂等的,多次创建不要紧
- 如果一个queue有多个consumers,rabbitmq默认会round-robin地发送给consumers,每个consumer接受到的总message大致相同
- 在rabbitmq内部,一个message绝对不会直接发送给queue,而是先到exchange处理
- Receiving messages from the queue需要注册一个callback函数
- 为了防止consumer出现异常而导致message丢失,应该使用acknowledgments。
- 如果consumer死了(connection断、channel断、tcp断)并且没有发回ack,rabbitmq将会把message放回queue中不会删除,如果该queue有其他consumer,那么会立即发送给其他consumer,这样保证message不丢失。
- 如果consumer一直不ack,rabbitmq将会很危险,内存使用会越来越高。
- queue_declare时,一定要设置durable为True,否则rabbitmq一重启queue就没了
- ack是默认开启的
- 虽然可以将queue设置为持久化(durable),但也不能完全保证,因为rabbitmq不是每个message都fsync的,而是先到cache中,不一定被落到磁盘上
- rabbitmq只管把queue中的messages往多个consumers扔,但不知道哪个consumer很忙,哪个很闲,可以使用channel.basic_qos(prefetch_count=1),意思是告诉rabbitmq,在consumer ack一个message之前,不要再给这个consumer发message,缺点是:如果consumers都很忙,rabbitmq中的message将堆积。
- 如果发送时没指定exchange的名字,则会发给routing_key对应的名字
- queue_bind方法可以做exchange和queues之间的bind操作
sudo rabbitmqctl list_exchanges
结果显示的amq.
开头的是默认创建的。- consumer和rabbitmq之间是长连接。
- 如果queue中有10条消息,2个consumers,如果其中一个consumer停掉又启动的话,也不会接收到现有的消息,现有的消息已被第1个consumer承包,为了避免这个情况出现,可以设置prefetch大小,这样每个consumer只能承包这么多的消息,而不是所有消息,好让其他的consumer也能处理现有message,如果有一个consumer hang住,并已占据一些message的话,可以重启这个consumer即可。
- 如果多个consumer监听同一个queue,会round-robin收到,如果想一个message同时被多个consumer收到,必须有多个对应的queue,exchange的类型设置为fanout
- durable和exclusive如果同时用的话,会如何?
- durable为false时,如果broker重启,queue则会消失。
- 什么情况下用topic?openstack中,由于compute node会变化,producer可以写死exchange类型为topic,动态的通过routing_key来指定。
- 何时通过rest api,何时通过rabbitmq?如果team与team中间不可信,则需要通过rest api,rest api有权限验证机制。
- rabbitmq是cpu、内存密集型的
HA
- 每台机器都要有相同的cookie!
- HA分为service HA和Message HA
- Message HA可以通过mirror queue实现,也可以通过共享存储实现
exchange的类型
exchange会决定一个message到底是发往特定的queue,还是所有queue。这是通过exchange type的设置来决定,有4种:
- direct
- producer设置exchange的routing key,consumer bind时指定该routing key。可以做到通过同一个exchange来达到不同的queue
,direct比fanout粒度更细一点
- topic
- 发送给topic的routing_key不能随意写,有规则,例如stock.usd.nyse,可以通过通配符的方式来分类。
- headers
- fanout
- exchange将它收到的messages扔到后端所有的queue中
exchange_declare用来创建exchange
plugins
rabbitmq支持很多plugins
- rabbitmq-plugins enable plugin-name
- rabbitmq-plugins disable plugin-name
- rabbitmq-plugins list
例如: 启动管理界面,重启rabbitmq后,访问 http://ip:15672
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins enable rabbitmq_management_visualiser