通过启用 RabbitMQ shovel plugins,实现消息从一个集群转发到另一个集群功能。
功能说明
RabbitMQ Shovel plugin
接受队列上的消息,并将其转发到另一台服务器上的交换机,实现消息在不同集群间转发。
安装部署
sudo rabbitmq-plugins enable rabbitmq_shovel
sudo rabbitmq-plugins enable rabbitmq_shovel_management
使用方式
RabbitMQ Shovel 存在两种配置方式,Static Shovels 和 Dynamic Shovels。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。
Static Shovels 配置实用
实现将 demo 环境 trove 的 RabbitMQ 队列(compute-1)转发到 controller RabbitMQ cluster(controller 节点)。
配置 compute-1 /etc/rabbitmq/rabbitmq.config,内容如下:
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{trove_notifications_info_shovel,
[
%% List the source broker(s) from which to consume.
%%
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
"amqp://rabbit_user:rabbitmq_pwd@rabbitmq-2/%2f",
"amqp://rabbit_user:rabbitmq_pwd@rabbitmq-3/%2f"]},
{declarations, [
%% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %%,
%% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% Name of the queue to shovel messages from.
%%
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
{publish_fields, [{exchange, <<"trove">>}]},
%% {routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
%%
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
精简后,配置说明如下:
{rabbitmq_shovel,
[{shovels,
{trove_notifications_info_shovel, %% shovel 名称
[
{sources, %% 消息源配置
{brokers, ["amqp://"]}, %% 配置采用本地的 amqp,参考:http://www.rabbitmq.com/uri-spec.html
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}, %% 声明队列
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]}, %% 声明交换机
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]} %% bind 关系
]}
]},
{destinations, %% 目的 RabbitMQ 集群配置
{brokers, ["amqp://rabbit_user:rabbitmq_pwd@rabbitmq-1/%2f",
"amqp://rabbit_user:rabbitmq_pwd@rabbitmq-2/%2f",
"amqp://rabbit_user:rabbitmq_pwd@rabbitmq-3/%2f"]}, %% 改配置项配置目标RabbitMQ 集群的 list,仅一个生效,其中 %2f 表示 vhost “/”,支持broker/brokers两种配置。
{declarations, [
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% 配置目标 exchange
]}
]},
{queue, <<"notifications.info">>}, %%
{prefetch_count, 1000}, %% 转发时,最大的未 ack 数量,当达到这个数量时,将不在转发数据
{ack_mode, on_confirm}, %% 每一个转发的消息,均需要确认
{publish_fields, [{exchange, <<"trove">>}]}, %% 配置目标 publish exchange
{publish_properties, [{delivery_mode, 2}]} %% 转发的消息添加 header
]}
]},
重启服务
systemctl restart rabbitmq-server.service
Dynamic Shovels
注:采用该方式配置的 Shovel,重启集群会消失。
administrator tag 的用户登录,管理界面 -> admin -> Shovel Management -> Add a new shovel ->
配置参数如下:
name: trove_notification_info_shovel
Source:
URI: amqp://
Queue: notifications.info
Destination:
URI: amqp://monitor:xiexianbin.cn@192.168.128.103/%2f
Queue: notifications.info
Prefetch count: 1000(默认值)
Reconnect delay: 0 (延迟时间)
Add forwarding headers: 是否启用 forwarding header
启用后,header 格式如下:
x-shovelled:
shovelled-by: rabbit@rabbitmq-101
shovel-type: dynamic
shovel-name: trove_shovel_2
shovel-vhost: /
src-uri: amqp://
dest-uri: amqp://192.168.128.103/%2f
src-queue: notifications.info
dest-queue: notifications.info
Acknowledgement mode: On confirm
Auto-delete: never
点击保存,即可。
高可用配置说明
- 采用 Dynamic Shovels 仅需要为集群中的一个节点配置 Sholve 即可。由于 Dynamic Shovels 存在重启对视 Shovel 配置问题,生成建议采用 Static Shovels 配置方案。
- 采用 Static Shovels 需要为集群中的所有节点配置 Sholve,并需要重启服务。
- 在转发端的 destinations brokers 指定一个集群所有节点的配置信息,转发时,会随机选择一个可用连接作为同步目标。
- 新增 shovel 后,在 Queues 中会新增一个
amq.gen-yWaNGiAPlqtzpUD8YjqV6g
的 queue。
其他配置
版本一:仅配置queue
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{trove_notifications_info_shovel,
[
%% List the source broker(s) from which to consume.
%%
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}
]}
]},
%% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{broker, "amqp://rabbitmq_user:rabbitmq_pwd@192.168.128.103/%2f"},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]}
]}
]},
%% Name of the queue to shovel messages from.
%%
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
{ack_mode, on_confirm}
%% Overwrite fields of the outbound basic.publish.
%%
%% {publish_fields, [{exchange, <<"my_exchange">>},
%% {routing_key, <<"from_shovel">>}]},
%% Static list of basic.properties to set on re-publication.
%%
%% {publish_properties, [{delivery_mode, 2}]},
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
版本二:配置 exchange + queue + 单节点
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{trove_notifications_info_shovel,
[
%% List the source broker(s) from which to consume.
%%
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{broker, "amqp://rabbitmq_user:rabbitmq_pwd@192.168.128.103/%2f"},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% Name of the queue to shovel messages from.
%%
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
{publish_fields, [{exchange, <<"trove">>},
{routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
%%
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
版本二:配置 exchange + queue + 集群
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{trove_notifications_info_shovel,
[
%% List the source broker(s) from which to consume.
%%
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{brokers, ["amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-1/%2f",
"amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-2/%2f",
"amqp://rabbitmq_user:rabbitmq_pwd@rabbitmq-3/%2f"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% Name of the queue to shovel messages from.
%%
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
{publish_fields, [{exchange, <<"trove">>},
{routing_key, <<"notifications.info">>}]},
%% Static list of basic.properties to set on re-publication.
%%
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
版本四:
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{trove_notifications_info_shovel,
[
%% List the source broker(s) from which to consume.
%%
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://"]},
{declarations, [
{'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]},
{'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{broker, "amqp://monitor:xiexianbin.cn@192.168.128.103/%2f"},
{declarations, [
%% {'queue.declare', [ {queue, <<"notifications.info">>} ]},
{'exchange.declare', [ {exchange, <<"trove">>}, {type, <<"topic">>} ]} %% ,
%% {'queue.bind', [ {exchange, <<"trove">>}, {queue, <<"notifications.info">>}, {routing_key, <<"notifications.info">>} ]}
]}
]},
%% Name of the queue to shovel messages from.
%%
{queue, <<"notifications.info">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
{prefetch_count, 1000},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
{ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
%% {publish_fields, [{exchange, <<"trove">>},
%% {routing_key, <<"notifications.info">>}]},
{publish_fields, [{exchange, <<"trove">>}]},
%% Static list of basic.properties to set on re-publication.
%%
{publish_properties, [{delivery_mode, 2}]}
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},