RabbitMQ 介绍

发布时间: 更新时间: 总字数:4134 阅读时间:9m 作者: IP上海 分享 网址
专栏文章
  1. RabbitMQ 集群配置
  2. RabbitMQ Shovel 配置 -- 消息在不同集群间转发
  3. 最简单的 RabbitMQ 监控及排错方法

RabbitMQ是重要的消息队列组件,OpenStack采用RabbitMQ做消息交互的重要组件之一。

常见 MQ 对比

ActiveMQRabbitMQRocketMQKafkaZeroMQ
单机吞吐量比RabbitMQ低2.6w/s(消息做持久化)11.6w/s17.3w/s29w/s
开发语言JavaErlangJavaScala/JavaC
主要维护者ApacheMozilla/SpringAlibabaApacheiMatix,创始人已去世
成熟度成熟成熟开源版本不够成熟比较成熟只有C、PHP等版本成熟
订阅形式点对点(p2p)、广播(发布-订阅)提供了4种:direct, topic, Headers和fanout。fanout就是广播模式基于 topic/messageTag 以及按照消息类型、属性进行正则匹配的发布订阅模式基于topic以及按照topic进行正则匹配的发布订阅模式点对点(p2p)
持久化支持少量堆积支持少量堆积支持大量堆积支持大量堆积不支持
顺序消息不支持不支持支持支持不支持
性能稳定性一般较差很好
集群方式支持简单集群模式,比如主-备,对高级集群模式支持不好。支持简单集群,复制模式,对高级集群模式支持不好。常用多对Master-Slave 模式,开源版本需手动切换Slave变成Master天然的‘Leader-Slave`无状态集群,每台服务器既是Master也是Slave不支持
管理界面一般较好一般

安装rabbitmq

sudo yum install rabbitmq-server -y

启动

sudo service rabbitmq-server start

常用端口

  • 4369 (epmd), 25672 (Erlang distribution)
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)

概念

RabbitMQ 实现了 AMQP 0-9-1[Advanced Message Queuing Protocol],

其中重要的概念是 交换机[Exchange] 和 队列[Queue],Exchange 和 Queue 通过 绑定[Bind] 操作进行绑定。

生产者[Producer] 不需要关注 消息[Message] 实际被 路由[Routing] 到哪个队列,而 消费者[Consumer] 也不需要关注 Message 是从哪个 Exchange 接收的。

Exchange 分为 直连[direct]交换机、扇形[fanout]交换机、主题[topic]交换机、头[headers]交换机。

  • direct exchange 与 queue 绑定时需要指定一个 routing_key,消息被发送到 direct exchange 时也需要指定一个 routing_key,消息会被分发到具有相同 routing_key 绑定的 queue 中;

  • fanout exchange 是一个广播型的交换机,发送到 fanout exchange 的消息被分发到与之绑定的所有 queue 中;

  • topic exchange 是更复杂的 direct exchange,它的 routing_key 必须是以 . 分隔的几个字符串,一般是以多个维度分隔 (例如日志系统中以 “应用.级别” [shop.info]来制定),topic exchange 与 queue 绑定时可以指定通配的 routing_key,* 表示一个单词,# 表示任意数量的单词;

  • headers exchange 不通过 routing_key 来建立与 queue 的绑定,而是使用消息的属性来进行分发。

访问用户

guest用户被默认创建,默认在localhost可以用guest/guest来访问。

rabbitmqctl命令可以CRUD用户,见命令help

rabbitmqctl set_user_tags user_admin administrator
# rabbitmqctl set_user_tags user_admin monitoring policymaker
rabbitmqctl set_permissions -p / user_admin ".*" ".*" ".*"
vhosts=`rabbitmqctl list_vhosts | grep -v "Listing vhosts" | grep -v "/"`
for vhost in $vhosts; do
  rabbitmqctl set_permissions -p $vhost user_admin ".*" ".*" ".*"
done

角色

RabbitMQ的用户角色分类: none、management、policymaker、monitoring、administrator

none

不能访问 management plugin

management

用户可以通过AMQP做的任何事外加:

  • 列出自己可以通过AMQP登入的virtual hosts
  • 查看自己的virtual hosts中的queues, exchanges 和 bindings
  • 查看和关闭自己的channels 和 connections
  • 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动

policymaker

management可以做的任何事外加:

  • 查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring

management可以做的任何事外加:

  • 列出所有virtual hosts,包括他们不能登录的virtual hosts
  • 查看其他用户的connections和channels
  • 查看节点级别的数据如clustering和memory使用情况
  • 查看真正的关于所有virtual hosts的全局的统计信息

administrator

policymaker和monitoring可以做的任何事外加:

  • 创建和删除virtual hosts
  • 查看、创建和删除users
  • 查看创建和删除permissions
  • 关闭其他用户的connections

日志

位于/var/log/rabbitmq

ulimit设置

We recommend allowing for at least 65536 file descriptors for user rabbitmq in production environments. 4096 should be sufficient for most development workloads

验证ulimit

cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits 或者 rabbitmqctl status

其中RABBITMQ_BEAM_PROCESS_PID的值可以从rabbitmqctl status中获取

注意事项

  • pika是rabbitmq官方推荐的python客户端
  • queue没有边界,是一个无限大的缓存。
  • producer的代码逻辑:
    • 先获得connection对象
    • 然后就获得channel对象
    • 然后queue_declare创建queue
    • basic_publish,参数routing_key指定了queue的名字,exchange指定了exchange的名字
    • connection.close()会flush缓存,然后关闭连接。
  • consumer的代码逻辑:
    • 先获得connection对象
    • 然后就获得channel对象
    • 然后queue_declare创建queue(是幂等的,没事)
    • 定义callback函数
    • basic_consume,参数queue指定queue的名字,callback传入当作回调,no_ack指定是否要ack
    • start_consuming,开始无限循环的等待。
  • 如果向一个不存在的queue发送message,会被丢弃。所以需要创建queue(queue_declare)
  • 创建queue是幂等的,多次创建不要紧
  • 如果一个queue有多个consumers,rabbitmq默认会round-robin地发送给consumers,每个consumer接受到的总message大致相同
  • 在rabbitmq内部,一个message绝对不会直接发送给queue,而是先到exchange处理
  • Receiving messages from the queue需要注册一个callback函数
  • 为了防止consumer出现异常而导致message丢失,应该使用acknowledgments。
  • 如果consumer死了(connection断、channel断、tcp断)并且没有发回ack,rabbitmq将会把message放回queue中不会删除,如果该queue有其他consumer,那么会立即发送给其他consumer,这样保证message不丢失。
  • 如果consumer一直不ack,rabbitmq将会很危险,内存使用会越来越高。
  • queue_declare时,一定要设置durable为True,否则rabbitmq一重启queue就没了
  • ack是默认开启的
  • 虽然可以将queue设置为持久化(durable),但也不能完全保证,因为rabbitmq不是每个message都fsync的,而是先到cache中,不一定被落到磁盘上
  • rabbitmq只管把queue中的messages往多个consumers扔,但不知道哪个consumer很忙,哪个很闲,可以使用channel.basic_qos(prefetch_count=1),意思是告诉rabbitmq,在consumer ack一个message之前,不要再给这个consumer发message,缺点是:如果consumers都很忙,rabbitmq中的message将堆积。
  • 如果发送时没指定exchange的名字,则会发给routing_key对应的名字
  • queue_bind方法可以做exchange和queues之间的bind操作
  • sudo rabbitmqctl list_exchanges 结果显示的amq.开头的是默认创建的。
  • consumer和rabbitmq之间是长连接。
  • 如果queue中有10条消息,2个consumers,如果其中一个consumer停掉又启动的话,也不会接收到现有的消息,现有的消息已被第1个consumer承包,为了避免这个情况出现,可以设置prefetch大小,这样每个consumer只能承包这么多的消息,而不是所有消息,好让其他的consumer也能处理现有message,如果有一个consumer hang住,并已占据一些message的话,可以重启这个consumer即可。
  • 如果多个consumer监听同一个queue,会round-robin收到,如果想一个message同时被多个consumer收到,必须有多个对应的queue,exchange的类型设置为fanout
  • durable和exclusive如果同时用的话,会如何?
  • durable为false时,如果broker重启,queue则会消失。
  • 什么情况下用topic?openstack中,由于compute node会变化,producer可以写死exchange类型为topic,动态的通过routing_key来指定。
  • 何时通过rest api,何时通过rabbitmq?如果team与team中间不可信,则需要通过rest api,rest api有权限验证机制。
  • rabbitmq是cpu、内存密集型的

HA

  • 每台机器都要有相同的cookie!
  • HA分为service HA和Message HA
  • Message HA可以通过mirror queue实现,也可以通过共享存储实现

exchange的类型

exchange会决定一个message到底是发往特定的queue,还是所有queue。这是通过exchange type的设置来决定,有4种:

  • direct
    • producer设置exchange的routing key,consumer bind时指定该routing key。可以做到通过同一个exchange来达到不同的queue ,direct比fanout粒度更细一点
  • topic
    • 发送给topic的routing_key不能随意写,有规则,例如stock.usd.nyse,可以通过通配符的方式来分类。
      • *表示一个word
      • #表示0或多个word
  • headers
  • fanout
    • exchange将它收到的messages扔到后端所有的queue中

exchange_declare用来创建exchange

plugins

rabbitmq支持很多plugins

  • rabbitmq-plugins enable plugin-name
  • rabbitmq-plugins disable plugin-name
  • rabbitmq-plugins list

例如: 启动管理界面,重启rabbitmq后,访问 http://ip:15672

sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins enable rabbitmq_management_visualiser
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数