Skip to main content

rsocket-js

rsocket-js implements the 1.0 version of the RSocket protocol and is designed for use in Node.js and browsers.

Packages#

The following packages are published to npm:

Status#

The following are currently implemented:

  • RSocketClient / RSocketServer
  • Node.js TCP/WebSocket server/client transport
  • Browser WebSocket client (binary)
  • TCK client for spec compliance testing
  • UTF-8 and Binary encoding at the transport layer
  • Optional JSON (de)serialization at the rsocket layer (send and receive objects instead of strings)
  • ReactiveStream data types

Reactive Streams#

rsocket-js includes an implementation of the Reactive Streams API in JavaScript. Note that unlike standard Rx Observables, Reactive Streams are lazy, pull-based, and support back-pressure. Two types are implemented:

  • Flowable: An implementation of the Reactive Streams Publisher type, providing a demand-driven stream of values over time.
  • Single: Like Flowable, but resolves to a single value.

rsocket-js public API methods typically return values of these types.

WebSocket Client & Server example#

Client Example#

The client sends a request/response message to the server on an interval, and exits after a certain amount of time has elapsed.

// rsocket-client.jsconst { RSocketClient } = require('rsocket-core');const RSocketWebsocketClient = require('rsocket-websocket-client').default;const WebSocket = require('ws');
function now() {  return new Date().getTime();}
async function connect(options) {  const transportOptions = {    url: 'ws://127.0.0.1:9898',    wsCreator: (url) => {      return new WebSocket(url);    },  };  const setup = {    keepAlive: 1000000,    lifetime: 100000,    dataMimeType: 'text/plain',    metadataMimeType: 'text/plain',  };  const transport = new RSocketWebsocketClient(transportOptions);  const client = new RSocketClient({ setup, transport });  return await client.connect();}
async function run() {  return new Promise(async (resolve, reject) => {    const rsocket = await connect();    const start = now();    const interval = setInterval(() => {      rsocket.requestResponse({ data: 'What is the current time?' }).subscribe({        onComplete: (response) => {          console.log(response);        },        onError: (error) => {          console.error(error);        },        onSubscribe: (cancel) => {          /* call cancel() to stop onComplete/onError */        },      });
      if (now() - start >= 5000) {        clearInterval(interval);        resolve();      }    }, 750);  });}
Promise.resolve(run()).then(  () => process.exit(0),  (error) => {    console.error(error.stack);    process.exit(1);  });

Server Example#

The server responds to request/response messages with the current time.

// rsocket-server.jsconst { RSocketServer } = require('rsocket-core');const RSocketWebSocketServer = require('rsocket-websocket-server');const { Single } = require('rsocket-flowable');
const WebSocketTransport = RSocketWebSocketServer.default;const host = '127.0.0.1';const port = 9898;
const transportOpts = {  host: host,  port: port,};
const transport = new WebSocketTransport(transportOpts);
const statuses = {  PENDING: 'pending',  CANCELLED: 'cancelled',};
const getRequestHandler = (requestingRSocket, setupPayload) => {  function handleRequestResponse(payload) {    let status = statuses.PENDING;
    console.log(`requestResponse request`, payload);
    return new Single((subscriber) => {      function handleCancellation() {        status = statuses.CANCELLED;      }
      subscriber.onSubscribe(() => handleCancellation());
      /**       * Leverage `setTimeout` to simulate a delay       * in responding to the client.       */      setTimeout(() => {        if (status === statuses.CANCELLED) {          return;        }
        const msg = `${new Date()}`;        console.log(`requestResponse response`, msg);        try {          subscriber.onComplete({            data: msg,            metadata: null, // or new Buffer(...)          });        } catch (e) {          subscriber.onError(e);        }      }, 100);    });  }
  return {    requestResponse: handleRequestResponse,  };};
const rSocketServer = new RSocketServer({  transport,  getRequestHandler,});
console.log(`Server starting on port ${port}...`);
rSocketServer.start();

More Examples#

Browse the following repositories for more rsocket-js examples: