Support RSocket
Skip to main content

Server Introduction

RSocketServer is the high level abstraction leveraged to create a server running the RSocket protocol. It is a subclass of RSocket.

An RSocketServer server can be used to communicate with any RSocket Client implemented against the same protocol version as the server, and which implements the same transport as the server.

To get started creating an RSocket server, you will need to install the rsocket package, and at least one transport protocol implementation (TCP available by default). See the server portion of Client Server Example for an example of an implemented getRequestHandler.

Transports

Transport is the abstraction which handles the underlying network communication portion of the RSocket applicaiton protocol.

Available network transports for rsocket-py server include:

  • TCP - available by default
  • Websocket (aiohttp, Quart)

RequestHandler

When creating a RSocketServer instance, the constructor require a factory (method or class) be provided that can return an object matching the RequestHandler abstract class. This object is responsible for mapping callback/handler functions to the various RSocket message types, and returning an appropriate Publisher/Future that will produce data for the request.

import asyncio
from typing import Tuple, Optional
from datetime import timedelta

from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from rsocket.payload import Payload
from rsocket.request_handler import RequestHandler
from rsocket.error_codes import ErrorCode


class CustomRequestHandler(RequestHandler):

async def on_setup(self,
data_encoding: bytes,
metadata_encoding: bytes,
payload: Payload):
...

async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
...

async def request_fire_and_forget(self, payload: Payload):
...

async def on_metadata_push(self, payload: Payload):
...

async def request_response(self, payload: Payload) -> asyncio.Future:
...

async def request_stream(self, payload: Payload) -> Publisher:
...

async def on_error(self, error_code: ErrorCode, payload: Payload):
...

async def on_keepalive_timeout(self,
time_since_last_keepalive: timedelta,
rsocket):
...

async def on_connection_lost(self, rsocket, exception):
...

Client Cancellation

An important characteristic of RSocket's protocol is the concept of cancellation.

In the context of an RSocket server, once a client connection/request has begun, it is possible that the client which initiated the request may decide it no longer wishes to continue and signal to the server that it wishes to cancel.

In the event that a client signals to an RSocket server that it wishes to cancel a request, the server should avoid calling the onComplete or onNext callbacks for the requests resulting Single or Flowable instances.

Cancellation Request-Response Example

import asyncio
import logging
from asyncio import Future

from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler

async def calculate():
try:
await asyncio.sleep(4)

return 'Response'
except asyncio.CancelledError:
logging.info('Canceled by client')
raise


class CustomRequestHandler(BaseRequestHandler):

async def request_response(self, payload: Payload) -> Future:
return asyncio.ensure_future(calculate())

Cancellation Request-Stream Example


from reactivestreams.publisher import Publisher
from reactivestreams.subscriber import Subscriber
from reactivestreams.subscription import Subscription
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler


class CustomRequestHandler(BaseRequestHandler):

class ResponseStream(Publisher, Subscription):
def subscribe(self, subscriber: Subscriber):
subscriber.on_subscribe(self)
self.subscriber = subscriber

async def request(self, n: int):
...

def cancel(self):
...

async def request_stream(self, payload: Payload) -> Publisher:
return self.ResponseStream()