rsocket-js
rsocket-js
implements the 1.0 version of the RSocket protocol
and is designed for use in Node.js and browsers.
#
PackagesThe following packages are published to npm:
- rsocket-core
- rsocket-flowable
- rsocket-tcp-client
- rsocket-tcp-server
- rsocket-websocket-client
- rsocket-websocket-server
#
StatusThe following are currently implemented:
- RSocketClient / RSocketServer
- Node.js TCP/WebSocket server/client transport
- Browser WebSocket client (binary)
- TCK client for spec compliance testing
- UTF-8 and Binary encoding at the transport layer
- Optional JSON (de)serialization at the rsocket layer (send and receive objects instead of strings)
- ReactiveStream data types
#
Reactive Streamsrsocket-js includes an implementation of the Reactive Streams API in JavaScript. Note that unlike standard Rx Observables, Reactive Streams are lazy, pull-based, and support back-pressure. Two types are implemented:
Flowable
: An implementation of the Reactive StreamsPublisher
type, providing a demand-driven stream of values over time.Single
: LikeFlowable
, but resolves to a single value.
rsocket-js public API methods typically return values of these types.
#
WebSocket Client & Server example#
Client ExampleThe client sends a request/response
message to the server on an interval, and exits after a certain amount of time has elapsed.
// rsocket-client.jsconst { RSocketClient } = require('rsocket-core');const RSocketWebsocketClient = require('rsocket-websocket-client').default;const WebSocket = require('ws');
function now() { return new Date().getTime();}
async function connect(options) { const transportOptions = { url: 'ws://127.0.0.1:9898', wsCreator: (url) => { return new WebSocket(url); }, }; const setup = { keepAlive: 1000000, lifetime: 100000, dataMimeType: 'text/plain', metadataMimeType: 'text/plain', }; const transport = new RSocketWebsocketClient(transportOptions); const client = new RSocketClient({ setup, transport }); return await client.connect();}
async function run() { return new Promise(async (resolve, reject) => { const rsocket = await connect(); const start = now(); const interval = setInterval(() => { rsocket.requestResponse({ data: 'What is the current time?' }).subscribe({ onComplete: (response) => { console.log(response); }, onError: (error) => { console.error(error); }, onSubscribe: (cancel) => { /* call cancel() to stop onComplete/onError */ }, });
if (now() - start >= 5000) { clearInterval(interval); resolve(); } }, 750); });}
Promise.resolve(run()).then( () => process.exit(0), (error) => { console.error(error.stack); process.exit(1); });
#
Server ExampleThe server responds to request/response
messages with the current time.
// rsocket-server.jsconst { RSocketServer } = require('rsocket-core');const RSocketWebSocketServer = require('rsocket-websocket-server');const { Single } = require('rsocket-flowable');
const WebSocketTransport = RSocketWebSocketServer.default;const host = '127.0.0.1';const port = 9898;
const transportOpts = { host: host, port: port,};
const transport = new WebSocketTransport(transportOpts);
const statuses = { PENDING: 'pending', CANCELLED: 'cancelled',};
const getRequestHandler = (requestingRSocket, setupPayload) => { function handleRequestResponse(payload) { let status = statuses.PENDING;
console.log(`requestResponse request`, payload);
return new Single((subscriber) => { function handleCancellation() { status = statuses.CANCELLED; }
subscriber.onSubscribe(() => handleCancellation());
/** * Leverage `setTimeout` to simulate a delay * in responding to the client. */ setTimeout(() => { if (status === statuses.CANCELLED) { return; }
const msg = `${new Date()}`; console.log(`requestResponse response`, msg); try { subscriber.onComplete({ data: msg, metadata: null, // or new Buffer(...) }); } catch (e) { subscriber.onError(e); } }, 100); }); }
return { requestResponse: handleRequestResponse, };};
const rSocketServer = new RSocketServer({ transport, getRequestHandler,});
console.log(`Server starting on port ${port}...`);
rSocketServer.start();
#
More ExamplesBrowse the following repositories for more rsocket-js
examples: