OpenStack RPC 远程调用

发布时间: 更新时间: 总字数:576 阅读时间:2m 作者: IP上海 分享 网址

RPC(Remote Procedure Call Protocol) 远程过程调用协议,是一种通过网络从远程计算机程序上请求服务。本文介绍 RPCOpenStack 中的使用。

作用

同一组件中各个不同模块之间通过消息代理以低耦合的方式进行通信,建立在发布/订阅(publish/subscribe)模式上(如 nova-computenova-scheduler 的通信)。OpenStack RPC 使用 MQ(如 RabbitMQ 介绍)作为消息传递中间件。

RPC 分类

  • call: 调用方法会阻塞线程并等待返回
    • 只允许有一个返回值,不允许对 fanout target 调用 call 方法
    • call 保证 RPC 请求最多执行一次
  • cast: 调用方法不会因等待返回值而阻塞线程
    • fanout target 属性设置为 True,可以向所有侦听给定 topic 的服务器广播 RPC 请求
    • cast 也会阻塞线程调用,直到有消息队列接受 RPC 的方法调用
    • cast 不验证对端服务器是否已调用RPC方法
    • cast 保证 RPC 在同一个目标上最多执行一次

调用关系

使用 RPC 的子组件通常包含以下文件:

  • api.py 对 RPC 接口进行封装,类似提供 SDK
  • rpcapi.py 暴露给其他内部组件的 RPC 接口,RPC 客户端
  • manager.py 处理 RPC API 调用

call func -> rpcapi.py:RPCAPI -> MQ -> manager.py:Manager(rpc server)

  • call func
class AgentController(base.BaseController):

    def __init__(self):
        super().__init__()
        self.agent_rpcapi = agent_rpcapi.AgentRPCAPI()

    @expose('json')
    def get(self):
        context = pecan_request.context.get('nova_context')
        return {'agent': self.agent_rpcapi.hello(context)}
  • rpcapi.py
class AgentRPCAPI(object):
    """Client side of the agent rpc API.."""

    def __init__(self):
        self.api_latest_version = agent_api.API.API_LATEST_VERSION
        self.target = messaging.Target(
            topic=constants.TOPIC_AGENT_V1, namespace=constants.RPC_NAMESPACE_AGENT,
            version=self.api_latest_version, fanout=False)
        self.client = rpc.get_client(self.target)

    def hello(self, ctxt):
        kwargs = dict()
        return self.client.call(ctxt, 'hello', **kwargs)
  • manager.py
class AgentManager():
    """Manages agent Action."""

    # API version history:
    #   1.0 - Initial version.
    target = messaging.Target(
        namespace=constants.RPC_NAMESPACE_AGENT,
        version=agent_api.API.API_LATEST_VERSION)

    def __init__(self, conf):
        super().__init__(conf)
        LOG.info('Agent Manager is init')

    def hello(self, context):
        return f'success call hello rpc, request_id is {context.get("request_id")}'
  • service.py 中监听 Manager
class AgentService(cotyledon.Service):
    ...
    def run(self):
        LOG.info('Starting V1 Agent...')
        target = messaging.Target(topic=self.topic, server=self.server,
                                  fanout=False)
        self.endpoints = [agent_manager.AgentManager(self.conf)]
        self.message_listener = rpc.get_server(
            target, self.endpoints,
            executor='threading',
            access_policy=self.access_policy
        )
        self.message_listener.start()
  • test_rpc.py
import datetime
import sys

from oslo_config import cfg
from oslo_log import log
import oslo_messaging

CONF = cfg.CONF

_options = []

log.register_options(CONF)
cfg.CONF.register_cli_opts(_options)
cfg.CONF(sys.argv[1:])
url = "rabbit://test:test@127.0.0.1:5672/test"


def call():
    oslo_messaging.set_transport_defaults(control_exchange='console')
    target = oslo_messaging.Target(
        topic='console-agent',
        server='127.0.0.1',
        namespace='namespace',
        version='1.0',
        fanout=False
    )
    transport = oslo_messaging.get_transport(CONF, url=url)

    client = oslo_messaging.RPCClient(transport, target, call_monitor_timeout=5)

    cctxt = client.prepare()
    kwargs = dict()
    result = cctxt.call({}, 'hello', **kwargs)
    return result


if __name__ == '__main__':
    print('start {}'.format(datetime.datetime.now()))
    print(call())
    print('end {}'.format(datetime.datetime.now()))

参考

  1. https://docs.openstack.org/oslo.messaging/latest/reference/rpcclient.html
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数