Skip to main content

rsocket-py

caution

The python package API is not stable. There may be changes until version 1.0.0

The python rsocket package implements the 1.0 version of the RSocket protocol and is designed for use in python >= 3.8 using asyncio.

Packages#

A pip package is available when installing with 'pip install rsocket' (rsocket)

Status#

The following are currently implemented:

  • RSocketClient / RSocketServer
  • TCP Transport
  • QUIC Transport
  • Simple load balancing
  • WebSocket transport (Websocket integration with Quart (server) and aiohttp (server/client))
  • Minimal integration with RxPy >= 3.x

Client & Server example#

Client Example#

The client sends a request/response message to the server on an interval, with the requested date-time format, and exits after a certain amount of time has elapsed.

# client.pyimport asyncioimport loggingimport sys
from reactivestreams.subscriber import DefaultSubscriberfrom rsocket.helpers import single_transport_providerfrom rsocket.payload import Payloadfrom rsocket.rsocket_client import RSocketClientfrom rsocket.transports.tcp import TransportTCP

class StreamSubscriber(DefaultSubscriber):
    def on_next(self, value, is_complete=False):        logging.info('RS: {}'.format(value))        self.subscription.request(1)

async def main(server_port):    logging.info('Connecting to server at localhost:%s', server_port)
    connection = await asyncio.open_connection('localhost', server_port)
    async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:        payload = Payload(b'%Y-%m-%d %H:%M:%S')
        async def run_request_response():            try:                while True:                    result = await client.request_response(payload)                    logging.info('Response: {}'.format(result.data))                    await asyncio.sleep(1)            except asyncio.CancelledError:                pass
        task = asyncio.create_task(run_request_response())
        await asyncio.sleep(5)        task.cancel()        await task

if __name__ == '__main__':    port = sys.argv[1] if len(sys.argv) > 1 else 6565    logging.basicConfig(level=logging.DEBUG)    asyncio.run(main(port))

Server Example#

The server responds to request/response messages with the current formatted date-time.

# server.pyimport asyncioimport loggingimport sysfrom datetime import datetime
from rsocket.helpers import create_futurefrom rsocket.payload import Payloadfrom rsocket.request_handler import BaseRequestHandlerfrom rsocket.rsocket_server import RSocketServerfrom rsocket.transports.tcp import TransportTCP

class Handler(BaseRequestHandler):    async def request_response(self, payload: Payload) -> asyncio.Future:        await asyncio.sleep(0.1)  # Simulate not immediate process        date_time_format = payload.data.decode('utf-8')        formatted_date_time = datetime.now().strftime(date_time_format)        return create_future(Payload(formatted_date_time.encode('utf-8')))

async def run_server(server_port):    logging.info('Starting server at localhost:%s', server_port)
    def session(*connection):        RSocketServer(TransportTCP(*connection), handler_factory=Handler)
    server = await asyncio.start_server(session, 'localhost', server_port)
    async with server:        await server.serve_forever()

if __name__ == '__main__':    port = sys.argv[1] if len(sys.argv) > 1 else 6565    logging.basicConfig(level=logging.DEBUG)    asyncio.run(run_server(port))

More Examples#

Browse the following repositories for more rsocket-py examples: