zeromq 使用介绍

发布时间: 更新时间: 总字数:1303 阅读时间:3m 作者: IP上海 分享 网址

ZeroMQ (也称为 ØMQ, 0MQ, 或 ZMQ) 是一个高性能的异步消息库,旨在用于可伸缩的分布式或并发应用程序。

介绍

ZeroMQ 提供了一个消息队列,但与传统的面向消息的中间件不同,ZeroMQ 的运行不需要专门的消息代理(message broker)。它的设计理念是零代理,因此得名 ZeroMQ。

ZeroMQ 的特点

  • 高性能 ZeroMQ 具有非常高的消息吞吐量和低延迟,使其成为构建高性能分布式系统的理想选择
  • 无代理 与传统的消息队列不同,ZeroMQ 不需要中央消息代理,应用程序直接通过 ZeroMQ 套接字进行通信
  • 多种消息模式 ZeroMQ 提供了多种套接字类型,每种类型都实现了特定的消息模式,例如:
    • 请求-响应 (Request-Reply) 用于客户端-服务器通信模式,类似于 RPC
    • 发布-订阅 (Publish-Subscribe) 用于一对多或多对多消息分发
    • 推-拉 (Push-Pull) 用于任务分发和负载均衡
    • 管道 (Pipeline) 用于构建多阶段的消息处理管道
  • 多传输协议 支持多种传输协议,包括 TCP、进程间通信 (IPC)、线程间通信 (Inproc) 和多播 (PGM)
  • 多语言支持 ZeroMQ 提供了多种编程语言的 API 绑定,包括 C、C++、Python、Java、Node.js、Go 等
  • 可伸缩性 通过其灵活的消息模式和无代理架构,ZeroMQ 可以轻松构建高度可伸缩的应用程序

ZeroMQ 消息模式示例

下面将通过 Python (使用 pyzmq 库) 演示几种常见的 ZeroMQ 消息模式。安装依赖:

# 26.4.0
pip install pyzmq

请求-响应 (Request-Reply) 模式

这种模式用于客户端向服务器发送请求并等待响应。

服务器端 (server.py)

import zmq
import time

def server():
    context = zmq.Context()
    # REP socket: replies to requests
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")  # 绑定到所有可用接口的5555端口

    print("Server started on tcp://*:5555")

    while True:
        # Wait for next request from client
        message = socket.recv_string()
        print(f"Received request: {message}")

        # Do some work
        time.sleep(1)

        # Send reply back to client
        reply = f"World from {message}"
        socket.send_string(reply)
        print(f"Sent reply: {reply}")

if __name__ == "__main__":
    server()

客户端 (client.py)

import zmq

def client():
    context = zmq.Context()
    # REQ socket: sends requests and receives replies
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555") # 连接到服务器

    print("Client connected to tcp://localhost:5555")

    for request in range(5):
        print(f"Sending request {request}...")
        socket.send_string(f"Hello {request}")

        # Get the reply
        message = socket.recv_string()
        print(f"Received reply {request}: {message}")

if __name__ == "__main__":
    client()

运行方式

  1. 先运行 python server.py
  2. 再运行 python client.py

发布-订阅 (Publish-Subscribe) 模式

这种模式用于一对多或多对多消息分发,发布者发送消息,订阅者接收它们感兴趣的消息。

发布者 (publisher.py)

import zmq
import time
import random

def publisher():
    context = zmq.Context()
    # PUB socket: publishes messages
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556") # 绑定到所有可用接口的5556端口

    print("Publisher started on tcp://*:5556")

    while True:
        temperature = random.randint(0, 50)
        humidity = random.randint(20, 90)

        # Send message with a topic prefix
        # Subscribers can filter messages based on this prefix
        socket.send_string(f"weather_data {temperature} {humidity}")
        print(f"Published: weather_data {temperature} {humidity}")

        # Send a different topic
        stock_price = random.uniform(100.0, 200.0)
        socket.send_string(f"stock_update AAPL {stock_price:.2f}")
        print(f"Published: stock_update AAPL {stock_price:.2f}")

        time.sleep(1)

if __name__ == "__main__":
    publisher()

订阅者 (subscriber.py)

import zmq

def subscriber():
    context = zmq.Context()
    # SUB socket: subscribes to messages
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556") # 连接到发布者

    # Subscribe to "weather_data" messages
    socket.setsockopt_string(zmq.SUBSCRIBE, "weather_data")
    # Subscribe to "stock_update AAPL" messages (more specific filter)
    socket.setsockopt_string(zmq.SUBSCRIBE, "stock_update AAPL")

    print("Subscriber connected to tcp://localhost:5556 and subscribed to 'weather_data' and 'stock_update AAPL'")

    while True:
        message = socket.recv_string()
        print(f"Received: {message}")

if __name__ == "__main__":
    subscriber()

运行方式

  1. 先运行 python publisher.py
  2. 再运行 python subscriber.py (可以运行多个订阅者)

推-拉 (Push-Pull) 模式

这种模式通常用于任务分发和负载均衡,其中一个或多个 (Push) 节点将任务发送给一个或多个 (Pull) 节点进行处理。

任务分发器 (ventilator.py - Push)

import zmq
import time

def ventilator():
    context = zmq.Context()
    # PUSH socket: pushes messages to a set of PULL sockets
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:5557") # 绑定到所有可用接口的5557端口

    print("Ventilator started on tcp://*:5557")
    print("Press Enter to start sending tasks...")
    input()

    for task_nbr in range(10):
        task_message = f"Task {task_nbr}"
        socket.send_string(task_message)
        print(f"Sent: {task_message}")
        time.sleep(0.1) # Simulate some work

    print("All tasks sent.")
    time.sleep(1) # Give workers time to finish
    socket.close()
    context.term()

if __name__ == "__main__":
    ventilator()

工作节点 (worker.py - Pull)

import zmq
import time

def worker():
    context = zmq.Context()
    # PULL socket: pulls messages from a set of PUSH sockets
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://localhost:5557") # 连接到任务分发器

    print("Worker started and connected to tcp://localhost:5557")

    while True:
        message = socket.recv_string()
        print(f"Received and processing: {message}...")
        time.sleep(0.5) # Simulate processing time
        print(f"Finished processing: {message}")

if __name__ == "__main__":
    worker()

运行方式

  1. 先运行 python ventilator.py
  2. 再运行一个或多个 python worker.py
  3. ventilator.py 的控制台中按回车开始发送任务。

总结

ZeroMQ 提供了一个强大的、灵活的消息层,可以用来构建各种分布式和并发应用程序,从简单的客户端-服务器应用到复杂的任务分发系统。它的无代理架构和多种消息模式使其成为传统消息队列的有力替代品,尤其适用于需要高性能和低延迟的场景。

参考

  1. https://zeromq.org/
  2. https://en.wikipedia.org/wiki/ZeroMQ
  3. https://zh.wikipedia.org/zh-cn/%C3%98MQ
  4. https://github.com/anjuke/zguide-cn/blob/master/chapter1.md#zmq%E7%AE%80%E4%BB%8B
本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics