通过启用 RabbitMQ shovel plugins,实现消息从一个集群转发到另一个集群功能。
RabbitMQ Shovel plugin
sudo rabbitmq-plugins enable rabbitmq_shovel
sudo rabbitmq-plugins enable rabbitmq_shovel_management
RabbitMQ Shovel 存在两种配置方式,Static Shovels 和 Dynamic Shovels。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。
Static Shovels 配置实用
实现将 demo 环境 trove 的 RabbitMQ 队列(compute-1)转发到 controller RabbitMQ cluster(controller 节点)。
配置 compute-1 /etc/rabbitmq/rabbitmq.config,内容如下:
[%% A named shovel worker.
%% List the source broker(s) from which to consume.
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% List the destination broker(s) to publish to.
[%% A singular version of the 'brokers' element.
{brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
{declarations, [
%% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %%,
%% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% Name of the queue to shovel messages from.
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
{publish_fields, [{exchange, <<"trove">>}]},
%% {routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
{trove_notifications_info_shovel, %% shovel 名称
{sources, %% 消息源配置
{brokers, ["amqp://"]}, %% 配置采用本地的 amqp,参考:http://www.rabbitmq.com/uri-spec.html
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}, %% 声明队列
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]}, %% 声明交换机
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]} %% bind 关系
{destinations, %% 目的 RabbitMQ 集群配置
{brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
"amqp://rabbit_user:rabbitmq_pwd@rabbitmq-3/%2f"]}, %% 改配置项配置目标RabbitMQ 集群的 list,仅一个生效,其中 %2f 表示 vhost “/”,支持broker/brokers两种配置。
{declarations, [
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% 配置目标 exchange
{queue, <<"notifications.info">>}, %%
{prefetch_count, 1000}, %% 转发时,最大的未 ack 数量,当达到这个数量时,将不在转发数据
{ack_mode, on_confirm}, %% 每一个转发的消息,均需要确认
{publish_fields, [{exchange, <<"trove">>}]}, %% 配置目标 publish exchange
{publish_properties, [{delivery_mode, 2}]} %% 转发的消息添加 header
systemctl restart rabbitmq-server.service
Dynamic Shovels
注:采用该方式配置的 Shovel,重启集群会消失。
administrator tag 的用户登录,管理界面 -> admin -> Shovel Management -> Add a new shovel ->
name: trove_notification_info_shovel
URI: amqp://
Queue: notifications.info
URI: amqp://monitor:xiexianbin.cn@
Queue: notifications.info
Prefetch count: 1000(默认值)
Reconnect delay: 0 (延迟时间)
Add forwarding headers: 是否启用 forwarding header
启用后,header 格式如下:
shovelled-by: rabbit@rabbitmq-101
shovel-type: dynamic
shovel-name: trove_shovel_2
shovel-vhost: /
src-uri: amqp://
dest-uri: amqp://
src-queue: notifications.info
dest-queue: notifications.info
Acknowledgement mode: On confirm
Auto-delete: never
- 采用 Dynamic Shovels 仅需要为集群中的一个节点配置 Sholve 即可。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。
- 采用 Static Shovels 需要为集群中的所有节点配置 Sholve,并需要重启服务。
- 在转发端的 destinations brokers 指定一个集群所有节点的配置信息,转发时,会随机选择一个可用连接作为同步目标。
- 新增 shovel 后,在 Queues 中会新增一个
的 queue。
[%% A named shovel worker.
%% List the source broker(s) from which to consume.
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}
%% List the destination broker(s) to publish to.
[%% A singular version of the 'brokers' element.
{broker, "amqp://rabbitmq_user:rabbitmq_pwd@"},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}
%% Name of the queue to shovel messages from.
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
{ack_mode, on_confirm}
%% Overwrite fields of the outbound basic.publish.
%% {publish_fields, [{exchange, <<"my_exchange">>},
%% {routing_key, <<"from_shovel">>}]},
%% Static list of basic.properties to set on re-publication.
%% {publish_properties, [{delivery_mode, 2}]},
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
版本二:配置 exchange + queue + 单节点
[%% A named shovel worker.
%% List the source broker(s) from which to consume.
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% List the destination broker(s) to publish to.
[%% A singular version of the 'brokers' element.
{broker, "amqp://rabbitmq_user:rabbitmq_pwd@"},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% Name of the queue to shovel messages from.
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
{publish_fields, [{exchange, <<"trove">>},
{routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
版本二:配置 exchange + queue + 集群
[%% A named shovel worker.
%% List the source broker(s) from which to consume.
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% List the destination broker(s) to publish to.
[%% A singular version of the 'brokers' element.
{brokers, ["amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-1/%2f",
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% Name of the queue to shovel messages from.
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
{publish_fields, [{exchange, <<"trove">>},
{routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
[%% A named shovel worker.
%% List the source broker(s) from which to consume.
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% List the destination broker(s) to publish to.
[%% A singular version of the 'brokers' element.
{broker, "amqp://monitor:xiexianbin.cn@"},
{declarations, [
%% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% ,
%% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
%% Name of the queue to shovel messages from.
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%% {publish_fields, [{exchange, <<"trove">>},
%% {routing_key, <<"notifications.info">>}]},
{publish_fields, [{exchange, <<"trove">>}]},
%% Static list of basic.properties to set on re-publication.
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}