RxJS и потоки событий

RxJS — реактивный синтаксис для асинхронности. Observable + операторы pipe (map/filter/throttle/take) образуют поток. Каждый оператор создаёт новый stream. Это не быстрее async/await, просто другой способ описывать асинхронность.

Когда Rx уместен

У вас есть игра, и там есть куча персонажей, и каждый персонаж присылает нам ивенты — события взаимодействуют между собой.

Применения: игры со взаимодействующими сущностями, IoT (датчики + правила), системы автоматизации производства, UI с потоками пользовательских событий.

Rx ≠ серебряная пуля

Вся асинхронность на Rx будет точно такой же, как если вы её напишете на async-await, на промисах, но будет другой синтаксис.

Rx позволяет нам писать асинхронность в реактивном стиле, а async-await — в императивном.

async/await Rx
Последовательность действий как алгоритм Мышление потоками и стримами
Императивный взгляд Реактивный взгляд

Observable: open constructor

const { Observable } = require('rxjs');

const source = new Observable((subscriber) => {
  setInterval(() => {
    subscriber.next(randomLetter);
  }, 100);
});

source.subscribe((x) => console.log(x));

Используется паттерн открытый конструктор. Мы прямо в конструктор Observable передаём функцию, которая что-то делает. Observable отдаёт ей объект subscriber.

pipe порождает новые стримы

const destination = source.pipe(
  filter((x) => isConsonant(x)),
  map((x) => x.toUpperCase()),
);

Каждый раз, когда мы в pipe отправляем новый оператор, создаётся новый поток событий. После каждого оператора — новый stream.

Операторы — паттерн Command

Это экземпляр map-оператора. Это практически паттерн команда, внутри которого лежит наша функция, через которую нужно мапить.

Оператор хранит переданную функцию. Объект-команда. Один оператор может ветвить (1→N) или схлопывать (N→1) события.

from: коллекция → stream

import { from } from 'rxjs';
import { filter, map, max } from 'rxjs/operators';

from([1, 2, 3, 4, 5, 6]).pipe(
  filter((x) => x % 2 === 0),
  map((x) => x * 2),
  max,
).subscribe(console.log);

stdin как Observable

process.stdin.setRawMode(true);
const keyboard = new Observable((subscriber) => {
  process.stdin.on('data', (data) => subscriber.next(data));
});

const cursors = keyboard.pipe(
  filter((buf) => buf[0] === 27 && buf[1] === 91),
  map((buf) => buf[2]),
  map((code) => arrows[code]),
  throttleTime(1000),
);

Escape-последовательность (27, 91, код) — это нажатие стрелки. throttleTime ограничивает частоту в играх против быстрого нажатия.

take(5) + reduce

keyboard.pipe(
  take(5),
  reduce((acc, buf) => acc + buf.toString(), ''),
).subscribe(console.log); // соберёт 5 нажатий в строку

Версионная привязка

Я ставлю rxjs из npm, версия 6.5.1. Контракт между версиями может отличаться. В репозиторий кладётся package.lock.

Между мажорными версиями (5→6→7) API ломался — pipe, переход на pipeable operators. Закрепляй версию.

Связанные темы

Ресурсы