rsocket-py
caution
The python package API is not stable. There may be changes until version 1.0.0
The python rsocket
package implements the 1.0 version of the RSocket protocol
and is designed for use in python >= 3.8 using asyncio.
#
PackagesA pip package is available when installing with 'pip install rsocket' (rsocket)
#
StatusThe following are currently implemented:
- RSocketClient / RSocketServer
- TCP Transport
- QUIC Transport
- Simple load balancing
- WebSocket transport (Websocket integration with Quart (server) and aiohttp (server/client))
- Minimal integration with RxPy >= 3.x
#
Client & Server example#
Client ExampleThe client sends a request/response
message to the server on an interval, with the requested date-time format, and exits after a certain amount of time has elapsed.
# client.pyimport asyncioimport loggingimport sys
from reactivestreams.subscriber import DefaultSubscriberfrom rsocket.helpers import single_transport_providerfrom rsocket.payload import Payloadfrom rsocket.rsocket_client import RSocketClientfrom rsocket.transports.tcp import TransportTCP
class StreamSubscriber(DefaultSubscriber):
def on_next(self, value, is_complete=False): logging.info('RS: {}'.format(value)) self.subscription.request(1)
async def main(server_port): logging.info('Connecting to server at localhost:%s', server_port)
connection = await asyncio.open_connection('localhost', server_port)
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client: payload = Payload(b'%Y-%m-%d %H:%M:%S')
async def run_request_response(): try: while True: result = await client.request_response(payload) logging.info('Response: {}'.format(result.data)) await asyncio.sleep(1) except asyncio.CancelledError: pass
task = asyncio.create_task(run_request_response())
await asyncio.sleep(5) task.cancel() await task
if __name__ == '__main__': port = sys.argv[1] if len(sys.argv) > 1 else 6565 logging.basicConfig(level=logging.DEBUG) asyncio.run(main(port))
#
Server ExampleThe server responds to request/response
messages with the current formatted date-time.
# server.pyimport asyncioimport loggingimport sysfrom datetime import datetime
from rsocket.helpers import create_futurefrom rsocket.payload import Payloadfrom rsocket.request_handler import BaseRequestHandlerfrom rsocket.rsocket_server import RSocketServerfrom rsocket.transports.tcp import TransportTCP
class Handler(BaseRequestHandler): async def request_response(self, payload: Payload) -> asyncio.Future: await asyncio.sleep(0.1) # Simulate not immediate process date_time_format = payload.data.decode('utf-8') formatted_date_time = datetime.now().strftime(date_time_format) return create_future(Payload(formatted_date_time.encode('utf-8')))
async def run_server(server_port): logging.info('Starting server at localhost:%s', server_port)
def session(*connection): RSocketServer(TransportTCP(*connection), handler_factory=Handler)
server = await asyncio.start_server(session, 'localhost', server_port)
async with server: await server.serve_forever()
if __name__ == '__main__': port = sys.argv[1] if len(sys.argv) > 1 else 6565 logging.basicConfig(level=logging.DEBUG) asyncio.run(run_server(port))
#
More ExamplesBrowse the following repositories for more rsocket-py
examples: