RPC Server

An RPC server exposes a number of endpoints, each of which contain a set of methods which may be invoked remotely by clients over a given transport.

To create an RPC server, you supply a transport, target and a list of endpoints.

A transport can be obtained simply by calling the get_rpc_transport() method:

transport = messaging.get_rpc_transport(conf)

which will load the appropriate transport driver according to the user’s messaging configuration. See get_rpc_transport() for more details.

The target supplied when creating an RPC server expresses the topic, server name and - optionally - the exchange to listen on. See Target for more details on these attributes.

Multiple RPC Servers may listen to the same topic (and exchange) simultaneously. See RPCClient for details regarding how RPC requests are distributed to the Servers in this case.

Each endpoint object may have a target attribute which may have namespace and version fields set. By default, we use the ‘null namespace’ and version 1.0. Incoming method calls will be dispatched to the first endpoint with the requested method, a matching namespace and a compatible version number.

The first parameter to method invocations is always the request context supplied by the client. The remaining parameters are the arguments supplied to the method by the client. Endpoint methods may return a value. If so the RPC Server will send the returned value back to the requesting client via the transport.

The executor parameter controls how incoming messages will be received and dispatched. Refer to the Executor documentation for descriptions of the types of executors.

Note: If the “eventlet” executor is used, the threading and time library need to be monkeypatched.

The RPC reply operation is best-effort: the server will consider the message containing the reply successfully sent once it is accepted by the messaging transport. The server does not guarantee that the reply is processed by the RPC client. If the send fails an error will be logged and the server will continue to processing incoming RPC requests.

Parameters to the method invocation and values returned from the method are python primitive types. However the actual encoding of the data in the message may not be in primitive form (e.g. the message payload may be a dictionary encoded as an ASCII string using JSON). A serializer object is used to convert incoming encoded message data to primitive types. The serializer is also used to convert the return value from primitive types to an encoding suitable for the message payload.

RPC servers have start(), stop() and wait() methods to begin handling requests, stop handling requests, and wait for all in-process requests to complete after the Server has been stopped.

A simple example of an RPC server with multiple endpoints might be:

# NOTE(changzhi): We are using eventlet executor and
# time.sleep(1), therefore, the server code needs to be
# monkey-patched.

import eventlet
eventlet.monkey_patch()

from oslo_config import cfg
import oslo_messaging
import time

class ServerControlEndpoint(object):

    target = oslo_messaging.Target(namespace='control',
                                   version='2.0')

    def __init__(self, server):
        self.server = server

    def stop(self, ctx):
        if self.server:
            self.server.stop()

class TestEndpoint(object):

    def test(self, ctx, arg):
        return arg

transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
    ServerControlEndpoint(None),
    TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
                                       executor='eventlet')
try:
    server.start()
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping server")

server.stop()
server.wait()
oslo_messaging.get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None, access_policy=None)

Construct an RPC server.

Parameters
  • transport (Transport) – the messaging transport

  • target (Target) – the exchange, topic and server to listen on

  • endpoints (list) – a list of endpoint objects

  • executor (str) – name of message executor - available values are ‘eventlet’ and ‘threading’

  • serializer (Serializer) – an optional entity serializer

  • access_policy (RPCAccessPolicyBase) – an optional access policy. Defaults to DefaultRPCAccessPolicy

class oslo_messaging.RPCAccessPolicyBase

Determines which endpoint methods may be invoked via RPC

class oslo_messaging.LegacyRPCAccessPolicy

The legacy access policy allows RPC access to all callable endpoint methods including private methods (methods prefixed by ‘_’)

class oslo_messaging.DefaultRPCAccessPolicy

The default access policy prevents RPC calls to private methods (methods prefixed by ‘_’)

Note

LegacyRPCAdapterPolicy currently needs to be the default while we have projects that rely on exposing private methods.

class oslo_messaging.ExplicitRPCAccessPolicy

Policy which requires decorated endpoint methods to allow dispatch

class oslo_messaging.RPCDispatcher(endpoints, serializer, access_policy=None)

A message dispatcher which understands RPC messages.

A MessageHandlingServer is constructed by passing a callable dispatcher which is invoked with context and message dictionaries each time a message is received.

RPCDispatcher is one such dispatcher which understands the format of RPC messages. The dispatcher looks at the namespace, version and method values in the message and matches those against a list of available endpoints.

Endpoints may have a target attribute describing the namespace and version of the methods exposed by that object.

The RPCDispatcher may have an access_policy attribute which determines which of the endpoint methods are to be dispatched. The default access_policy dispatches all public methods on an endpoint object.

class oslo_messaging.MessageHandlingServer(transport, dispatcher, executor='blocking')

Server for handling messages.

Connect a transport to a dispatcher that knows how to process the message using an executor that knows how the app wants to create new tasks.

reset()

Reset service.

Called in case service running in daemon mode receives SIGHUP.

start(override_pool_size=None)

Start handling incoming messages.

This method causes the server to begin polling the transport for incoming messages and passing them to the dispatcher. Message processing will continue until the stop() method is called.

The executor controls how the server integrates with the applications I/O handling strategy - it may choose to poll for messages in a new process, thread or co-operatively scheduled coroutine or simply by registering a callback with an event loop. Similarly, the executor may choose to dispatch messages in a new thread, coroutine or simply the current thread.

stop()

Stop handling incoming messages.

Once this method returns, no new incoming messages will be handled by the server. However, the server may still be in the process of handling some messages, and underlying driver resources associated to this server are still in use. See ‘wait’ for more details.

wait()

Wait for message processing to complete.

After calling stop(), there may still be some existing messages which have not been completely processed. The wait() method blocks until all message processing has completed.

Once it’s finished, the underlying driver resources associated to this server are released (like closing useless network connections).

oslo_messaging.expected_exceptions(*exceptions)

Decorator for RPC endpoint methods that raise expected exceptions.

Marking an endpoint method with this decorator allows the declaration of expected exceptions that the RPC server should not consider fatal, and not log as if they were generated in a real error scenario.

Note that this will cause listed exceptions to be wrapped in an ExpectedException, which is used internally by the RPC sever. The RPC client will see the original exception type.

oslo_messaging.expose(func)

Decorator for RPC endpoint methods that are exposed to the RPC client.

If the dispatcher’s access_policy is set to ExplicitRPCAccessPolicy then endpoint methods need to be explicitly exposed.:

# foo() cannot be invoked by an RPC client
def foo(self):
    pass

# bar() can be invoked by an RPC client
@rpc.expose
def bar(self):
    pass
exception oslo_messaging.ExpectedException

Encapsulates an expected exception raised by an RPC endpoint

Merely instantiating this exception records the current exception information, which will be passed back to the RPC client without exceptional logging.