Support RSocket
Skip to main content

Simple Quickstart

Client Example

The client sends a request/response message to the server on an interval, with the requested date-time format, and exits after a certain amount of time has elapsed.

# client.py
import asyncio
import logging
import sys

from reactivestreams.subscriber import DefaultSubscriber
from rsocket.helpers import single_transport_provider
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP

class StreamSubscriber(DefaultSubscriber):

def on_next(self, value, is_complete=False):
logging.info('RS: {}'.format(value))
self.subscription.request(1)

async def main(server_port):
logging.info('Connecting to server at localhost:%s', server_port)

connection = await asyncio.open_connection('localhost', server_port)

async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
payload = Payload(b'%Y-%m-%d %H:%M:%S')

async def run_request_response():
try:
while True:
result = await client.request_response(payload)
logging.info('Response: {}'.format(result.data))
await asyncio.sleep(1)
except asyncio.CancelledError:
pass

task = asyncio.create_task(run_request_response())

await asyncio.sleep(5)
task.cancel()
await task

if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(port))

Server Example

The server responds to request/response messages with the current formatted date-time.

# server.py
import asyncio
import logging
import sys
from datetime import datetime

from rsocket.helpers import create_future
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP

class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> asyncio.Future:
await asyncio.sleep(0.1) # Simulate not immediate process
date_time_format = payload.data.decode('utf-8')
formatted_date_time = datetime.now().strftime(date_time_format)
return create_future(Payload(formatted_date_time.encode('utf-8')))

async def run_server(server_port):
logging.info('Starting server at localhost:%s', server_port)

def session(*connection):
RSocketServer(TransportTCP(*connection), handler_factory=Handler)

server = await asyncio.start_server(session, 'localhost', server_port)

async with server:
await server.serve_forever()

if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else 6565
logging.basicConfig(level=logging.DEBUG)
asyncio.run(run_server(port))

More Examples

Browse the following repositories for more rsocket-py examples: