Support RSocket
Skip to main content

Flowable

The Flowable class implements the ReactiveStream Publisher interface with Rx-style operators.

Example

This example creates a Flowable that publishes the numbers 0, 1, 2, 3 on demand and then completes.

const flowable = new Flowable((subscriber) => {
// lambda is not executed until `subscribe()` is called
const values = [0, 1, 2, 3];
subscriber.onSubscribe({
cancel: () => {
/* no-op */
},
request: (n) => {
while (n--) {
if (values.length) {
const next = values.shift();
// Can't publish values until request() is called
subscriber.onNext(next);
} else {
subscriber.onComplete();
break;
}
}
},
});
});

flowable.subscribe({
onComplete: () => console.log('done'),
onError: (error) => console.error(error),
onNext: (value) => console.log(value),
// Nothing happens until `request(n)` is called
onSubscribe: (sub) => sub.request(4),
});
// logs '0', '1', '2', '3', 'done'

API

constructor (function)

class Flowable<T> {
constructor(source: Source<T>)
}

type Source<T> = (subscriber: Subscriber<T>) => void;

type Subscriber<T> = {
onComplete: () => void,
onError: (error: Error) => void,
onNext: (data: T) => void,
onSubscribe: (subscription: Subscription) => void,
};

type Subscription = {
cancel(): void,
request(n: number): void,
};

subscribe() (method)

This method connects the Flowable (publisher) to a subscriber of values. Subscribing alone does not indicate demand: rather, it connects publisher & subscriber and allows the subscriber to begin expressing demand for values via a Subscription object. Note that PartialSubscriber differs from the above Subscriber only in that methods are optional.

subscribe(subscriber: PartialSubscriber<T>): void

type PartialSubscriber<T> = {
onComplete?: () => void,
onError?: (error: Error) => void,
onNext?: (data: T) => void,
onSubscribe?: (subscription: Subscription) => void,
};

map() (method)

This method applies a transform function to values produced by this Flowable. This is similar to Array.prototype.map, Observable.prototype.map, etc.

map<U>(fn: (data: T) => U): Flowable<U>