RxJS и потоки событий
RxJS — реактивный синтаксис для асинхронности. Observable + операторы pipe (map/filter/throttle/take) образуют поток. Каждый оператор создаёт новый stream. Это не быстрее async/await, просто другой способ описывать асинхронность.
Когда Rx уместен
У вас есть игра, и там есть куча персонажей, и каждый персонаж присылает нам ивенты — события взаимодействуют между собой.
Применения: игры со взаимодействующими сущностями, IoT (датчики + правила), системы автоматизации производства, UI с потоками пользовательских событий.
Rx ≠ серебряная пуля
Вся асинхронность на Rx будет точно такой же, как если вы её напишете на async-await, на промисах, но будет другой синтаксис.
Rx позволяет нам писать асинхронность в реактивном стиле, а async-await — в императивном.
| async/await | Rx |
|---|---|
| Последовательность действий как алгоритм | Мышление потоками и стримами |
| Императивный взгляд | Реактивный взгляд |
Observable: open constructor
const { Observable } = require('rxjs');
const source = new Observable((subscriber) => {
setInterval(() => {
subscriber.next(randomLetter);
}, 100);
});
source.subscribe((x) => console.log(x));
Используется паттерн открытый конструктор. Мы прямо в конструктор Observable передаём функцию, которая что-то делает. Observable отдаёт ей объект subscriber.
pipe порождает новые стримы
const destination = source.pipe(
filter((x) => isConsonant(x)),
map((x) => x.toUpperCase()),
);
Каждый раз, когда мы в pipe отправляем новый оператор, создаётся новый поток событий. После каждого оператора — новый stream.
Операторы — паттерн Command
Это экземпляр map-оператора. Это практически паттерн команда, внутри которого лежит наша функция, через которую нужно мапить.
Оператор хранит переданную функцию. Объект-команда. Один оператор может ветвить (1→N) или схлопывать (N→1) события.
from: коллекция → stream
import { from } from 'rxjs';
import { filter, map, max } from 'rxjs/operators';
from([1, 2, 3, 4, 5, 6]).pipe(
filter((x) => x % 2 === 0),
map((x) => x * 2),
max,
).subscribe(console.log);
stdin как Observable
process.stdin.setRawMode(true);
const keyboard = new Observable((subscriber) => {
process.stdin.on('data', (data) => subscriber.next(data));
});
const cursors = keyboard.pipe(
filter((buf) => buf[0] === 27 && buf[1] === 91),
map((buf) => buf[2]),
map((code) => arrows[code]),
throttleTime(1000),
);
Escape-последовательность (27, 91, код) — это нажатие стрелки. throttleTime ограничивает частоту в играх против быстрого нажатия.
take(5) + reduce
keyboard.pipe(
take(5),
reduce((acc, buf) => acc + buf.toString(), ''),
).subscribe(console.log); // соберёт 5 нажатий в строку
Версионная привязка
Я ставлю rxjs из npm, версия 6.5.1. Контракт между версиями может отличаться. В репозиторий кладётся package.lock.
Между мажорными версиями (5→6→7) API ломался — pipe, переход на pipeable operators. Закрепляй версию.
Связанные темы
- Observer Observable паттерн
- Реактивное программирование основы
- Reactor Pattern
- Композиция функций (pipe, compose)
- Iterator Pattern
Ресурсы
- Лекция: 0kcpMAl-wfE
- RxJS Docs: https://rxjs.dev/