RPC(Remote Procedure Call Protocol)
远程过程调用协议,是一种通过网络从远程计算机程序上请求服务。本文介绍 RPC
在 OpenStack
中的使用。
作用
同一组件中各个不同模块之间通过消息代理以低耦合的方式进行通信,建立在发布/订阅(publish/subscribe)模式上(如 nova-compute
与 nova-scheduler
的通信)。OpenStack RPC 使用 MQ(如 RabbitMQ 介绍)作为消息传递中间件。
RPC 分类
- call: 调用方法会阻塞线程并等待返回
- 只允许有一个返回值,不允许对
fanout target
调用 call
方法
- call 保证
RPC
请求最多执行一次
- cast: 调用方法不会因等待返回值而阻塞线程
- 将
fanout target
属性设置为 True
,可以向所有侦听给定 topic
的服务器广播 RPC
请求
- cast 也会阻塞线程调用,直到有消息队列接受
RPC
的方法调用
- cast 不验证对端服务器是否已调用RPC方法
- cast 保证
RPC
在同一个目标上最多执行一次
调用关系
使用 RPC 的子组件通常包含以下文件:
api.py
对 RPC 接口进行封装,类似提供 SDK
rpcapi.py
暴露给其他内部组件的 RPC 接口,RPC 客户端
manager.py
处理 RPC API 调用
call func
-> rpcapi.py:RPCAPI
-> MQ
-> manager.py:Manager
(rpc server)
class AgentController(base.BaseController):
def __init__(self):
super().__init__()
self.agent_rpcapi = agent_rpcapi.AgentRPCAPI()
@expose('json')
def get(self):
context = pecan_request.context.get('nova_context')
return {'agent': self.agent_rpcapi.hello(context)}
class AgentRPCAPI(object):
"""Client side of the agent rpc API.."""
def __init__(self):
self.api_latest_version = agent_api.API.API_LATEST_VERSION
self.target = messaging.Target(
topic=constants.TOPIC_AGENT_V1, namespace=constants.RPC_NAMESPACE_AGENT,
version=self.api_latest_version, fanout=False)
self.client = rpc.get_client(self.target)
def hello(self, ctxt):
kwargs = dict()
return self.client.call(ctxt, 'hello', **kwargs)
class AgentManager():
"""Manages agent Action."""
# API version history:
# 1.0 - Initial version.
target = messaging.Target(
namespace=constants.RPC_NAMESPACE_AGENT,
version=agent_api.API.API_LATEST_VERSION)
def __init__(self, conf):
super().__init__(conf)
LOG.info('Agent Manager is init')
def hello(self, context):
return f'success call hello rpc, request_id is {context.get("request_id")}'
class AgentService(cotyledon.Service):
...
def run(self):
LOG.info('Starting V1 Agent...')
target = messaging.Target(topic=self.topic, server=self.server,
fanout=False)
self.endpoints = [agent_manager.AgentManager(self.conf)]
self.message_listener = rpc.get_server(
target, self.endpoints,
executor='threading',
access_policy=self.access_policy
)
self.message_listener.start()
import datetime
import sys
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
CONF = cfg.CONF
_options = []
log.register_options(CONF)
cfg.CONF.register_cli_opts(_options)
cfg.CONF(sys.argv[1:])
url = "rabbit://test:test@127.0.0.1:5672/test"
def call():
oslo_messaging.set_transport_defaults(control_exchange='console')
target = oslo_messaging.Target(
topic='console-agent',
server='127.0.0.1',
namespace='namespace',
version='1.0',
fanout=False
)
transport = oslo_messaging.get_transport(CONF, url=url)
client = oslo_messaging.RPCClient(transport, target, call_monitor_timeout=5)
cctxt = client.prepare()
kwargs = dict()
result = cctxt.call({}, 'hello', **kwargs)
return result
if __name__ == '__main__':
print('start {}'.format(datetime.datetime.now()))
print(call())
print('end {}'.format(datetime.datetime.now()))