Skip to main content

Reactive Programming

Reactive programming is a declarative programming paradigm that handles asynchronous workflows in data streams. If the idea is fresh to you, @andrestaltz has a great article introducing the concept using Rx.

The reactive paradigm naturally suits a sociable app which listens to events from chat platforms reactively. It's our recommended way to control the business logic flow.

Machinat Stream Package

@machinat/stream implements the reactive programming utilities that are optimized for sociable apps. The most important specialty is: All the events on a chat are processed in a strict order in the stream.

Install

Install the @machinat/stream package with:

npm install @machinat/stream

Usage

Root Stream From App

Use fromApp helper to create a stream from a Machinat app:

import Machinat from '@machinat/core';
import { fromApp } from '@machinat/stream';

const app = Machinat.createApp({...});

const event$ = fromApp(app);

The event$ stream will include all events you would receive at app.onEvent().

tip

The variable name with a trailing $ is a naming convention for a stream in reactive programming.

Listen to a Stream

stream.subscribe(listener, errorHandler?) method listens to events on a stream. The listener function would receive the event context object. Like:

event$.subscribe(async ({ event, reply }) => {
await reply(`Hello ${event.text}`);
});

It can also take a service container as the listener. Like:

import Machinat, { BasicProfiler } from '@machinat/core';

event$.subscribe(
makeContainer({ deps: [BasicProfiler] })(
(profiler) =>
async ({ event, reply }) => {
const profile = await profiler.getUserProfile(event.user);
await reply(<p>Hello {profile?.name || 'there'}!</p>);
}
)
);

Pipe to a New Stream

stream.pipe(...operators) method pipes the stream through a series of operators that output a new stream. Through it we can split the business logic into working flows, which helps us to build the app in a declarative and modular way.

We'll introduce some commonly used operators below.

Filter a Stream

filter(predicator) operator filters the stream like Array.filter.

It takes a predicator function with (eventContext) => boolean type. The event is passed to the next stream only when the predicator returnstrue`.

We can use it to extract a fraction of events so they can be handled separately. For example:

import { filter } from '@machinat/stream/operators';

event$
.pipe(filter((ctx) => ctx.platform === 'webview'))
.subscribe(handleWebview);

event$
.pipe(filter((ctx) => ctx.event.category === 'message'))
.subscribe(handleMessage);

Map a Stream

map(transformer) operator shapes the events in the stream like Array.map.

It takes a transformer function with (eventContext) => newContext type. The new event context will be passed to the next stream.

We can use it to execute a job and attach the result onto the context. For example:

import { makeContainer, IntentRecognizer } from '@machinat/core';
import { map } from '@machinat/stream/operators';

const eventWithIntent$ = event$.pipe(
map(
makeContainer({ deps: [IntentRecognizer] })(
(recognizer) =>
async (context) => {
const { event } = context;
let intent = null;

if (event.type === 'text') {
intent = await recognizer.detectText(event.channel, event.text);
}

return { ...context, intent };
}
)
)
);

Execute a Side Effect

tap(effectFn) operator executes a job and then passes the original context down when it's finished.

The difference from stream.subscribe(operator) is that tap guarantees the execution order is one-by-one under a chat.

import Machinat, { makeContainer, StateController } from '@machinat/core';
import { tap } from '@machinat/stream/operators';

event$.pipe(
tap(
makeContainer({
deps: [StateController],
})(
(stateController) =>
async ({ event, reply }) => {
const count = stateController
.channelState(event.channel)
.update((curCount = 0) => curCount + 1);

await reply(<p>Hello #{count}!</p>);
}
)
)
);

Use Asynchronized Container

The operators can take an asyncronized function as the callback. As stated, all the async jobs under a chat are executed in a strict order.

They also accept a service container of the callback function. As the examples above, you can require services to handle business logic.

Merge Streams

merge(...streams) merge multiple streams into one.

import { merge } from '@machinat/stream';

const greeting$ = merge(webviewGreeting$, chatGreeting$);

Constraints and Road Map

Currently the execution order is only guaranteed on a single server. If you are running the app on a cluster, the events under a chat may go to different servers and break the order.

To fix this, it requires an external broker to distribute the jobs and guarantee the order. But there are some major challenges to overcome:

Persistence

Many of the Rx stream operators like count are stateful. The stream state have be stored in the database, so it can be recovered when the server restarts.

// the status of count have to be persistent
const msgCount$ = message$.pipe(count());

Concurrency

On every process and machine, the streams and operators should work identically. This means the stateful operations should be safe from race conditions.

// every process should see the identical count
const msgCount$ = message$.pipe(count());

Ordering

The events should be processed in the same order as they are received, even when executing asynchronous jobs. This is difficult because events could be sent to different processes.

// the mirrored messages should always be in the same order
message$.pipe(
map(someAsyncWork),
map(async ({ reply, event }) =>
reply(event.text + '!!!')
)
);

Exactly Once

An event should be processed exactly once in the stream, not being omitted or duplicated. If a server is down unexpectedly, the unfinished events should be able to resume.

Road Map

Kafka Stream is a previous art that implements the reactive stream for server-based programs. It's built upon Apache Kafka to provide the guarantees mentioned above.

In the future, we'll make a similar solution in JavaScript which is based on Kafka or other brokers. It'll be optimized for conversational apps, and could possibly support all the server-based programs.