Support RSocket
Skip to main content

Client Introduction

An rsocket-py client can be used to communicate with any RSocket Server implemented against the same protocol version as the client, and which implements the same transport as the client.

Available network transports for rsocket-py client include:

  • TCP - available by default
  • Websocket (aiohttp)

The RSocketClient class should be passed an instance of one of the available transports.

To get started creating an RSocket client, you will need to install the rsocket package, and at least one transport protocol implementation (TCP available by default).

Client Quick Start Example

import asyncio
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
from rsocket.helpers import single_transport_provider


async def main():
connection = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
... # Execute requests

if __name__ == '__main__':
asyncio.run(main())

Client API

The rsocket-py package exposes the following types:

RSocketClient (class)

RSocketClient is used to create an instance of a client. The clients' connection does not initialize until the connect method is invoked, or it is used as a context-manager. It is a subclass of RSocket.

Constructor properties

transport (property)

This will typically be an instance conforming to the API of the Transport class.

connect() (method)

This method opens the connection to the peer. Internally this calls connect() on the transport client. See below for the RSocket interface.

RSocket (class)

This interface represents an instance of a rsocket peer-to-peer connection, providing the five core interactions (fire/forget, request/response, etc.):

fire_and_forget() (method)

This method sends data/metadata to the server without waiting for a response. The data is sent immediately.

from rsocket.payload import Payload

def fire_and_forget(self, payload: Payload):
...

request_response() (method)

This method sends data/metadata to the server, which returns a single response. The data is sent lazily when the returned Future is resolved.

from rsocket.payload import Payload
from asyncio import Future

def request_response(self, payload: Payload) -> Future:
...

request_stream() (method)

This method sends data/metadata to the server, which returns a stream of responses. The semantics of the stream are application-specific. For example, the stream may represent updates to a single conceptual value over time, items in an incrementally loaded list, events, etc. The data is sent to the peer lazily when the returned Publisher is subscribed to and request(n) is called to signal demand.

from typing import Union

from reactivestreams.publisher import Publisher
from rsocket.payload import Payload
from rsocket.streams.backpressureapi import BackpressureApi

def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
...

requestChannel() (method)

This method establishes an understanding between a client and a server where each intends to send and receive streams of data from the other. Each actor in this relationship is responsible for signaling to the other that they are ready to receive data by invoking request(n), where n is the max number of payloads the actor is comfortable handling. Conceptually, request_channel can be thought of as two entities 'polling' from each other by signaling to the others that they are ready to accept n number of messages. Inversely, request_channel can be leveraged to facilitate a consistent stream of data transfer payloads between client and server by each (or either) invoking request(0x7fffffff), where 0x7fffffff is the max integer value for int32.

from typing import Union, Optional

from reactivestreams.publisher import Publisher
from rsocket.payload import Payload
from rsocket.streams.backpressureapi import BackpressureApi

def request_channel(self, payload: Payload, publisher: Optional[Publisher] = None) -> Union[BackpressureApi, Publisher]:
...

metadata_push() (method)

This method sends metadata only to the server without waiting for a response. The payload is sent immediately. This method is not for the direct application usage and should be used to exchange some service level information

def metadata_push(self, metadata: bytes):
...