Support RSocket
Skip to main content

RSocketServer - rsocket-js

RSocketServer is the high level abstraction leveraged to create a server running the RSocket protocol.

An RSocketServer server can be used to communicate with any RSocket Client implemented against the same protocol version as the server, and which implements the same transport as the server.

To get started creating an RSocket server, you will first need to install the rsocket-core NPM package, and atleast one transport protocol implementation. See the server portion of WebSocket Client Server Example for an example of an implemented getRequestHandler.

Transports

A transport is the abstraction which handles the underlying network communication portion of the RSocket applicaiton protocol.

Available network transports for RSocketServer include:

getRequestHandler

When creating a RSocketServer instance, the constructor options require a function (getRequestHandler) to be provided that can return an object matching the Responder interface. This object is responsible for mapping callback/handler functions to the various RSocket message types, and returning an appropriate Single or Flowable from rsocket-flowable that will produce/injest data for the request.

const getRequestHandler = (requestingRSocket, setupPayload) => {

function handleFireAndForget() {
...
}

function handleRequestResponse(payload) {
return new Single((subscriber) => {
...
});
}

function handleRequestStream(payload) {
return new Flowable((subscriber) => {
...
});
}

function handleRequestChannel(payload) {
return new Flowable((subscriber) => {
...
});
}

function handleMetadataPush(payload) {
return new Single((subscriber) => {
...
});
}

return {
fireAndForget: handleFireAndForget,
requestResponse: handleRequestResponse,
requestStream: handleRequestStream,
requestChannel: handleRequestChannel,
metadataPush: handleMetadataPush
};
};

const rSocketServer = new RSocketServer({
getRequestHandler,
...
});

Client Cancellation

An important characteristic of RSocket's protocol is the concept of cancellation.

In the context of an RSocket server, once a client connection/request has begun, it is possible that the client which initiated the request may decide it no longer wishes to continue and signal to the server that it wishes to cancel.

In the event that a client signals to an RSocket server that it wishes to cancel a request, the server should avoid calling the onComplete or onNext callbacks for the requests resulting Single or Flowable instances.

Cancellation Example

const statuses = {
PENDING: 'pending',
CANCELLED: 'cancelled',
};

const getRequestHandler = (requestingRSocket, setupPayload) => {
function handleRequestResponse(payload) {
const simulatedDelayMilli = 100;
let status = statuses.PENDING;

return new Single((subscriber) => {
/**
* In the event that the client cancels the request before
* the server can respond, we will change our status to cancelled
* and avoid calling `onComplete` on the `subscriber` instance in the
* `nextTick` callback.
*/
function handleCancellation() {
status = statuses.CANCELLED;
}

subscriber.onSubscribe(() => handleCancellation());

/**
* Leverage `setTimeout` to simulate a delay
* in responding to the client, and thus giving it
* time to decide to "cancel".
*/
setTimeout(() => {
/**
* If the client cancelled the request we can
* return early and avoid doing any of the work below.
*/
if (status === statuses.CANCELLED) {
return;
}

const msg = `${new Date()}`;
try {
subscriber.onComplete({
data: msg,
metadata: null,
});
} catch (e) {
subscriber.onError(e);
}
}, simulatedDelayMilli);
});
}

return {
requestResponse: handleRequestResponse,
};
};