Async композиция и коллекторы

Способы соединять асинхронные шаги: compose/pipe для последовательной композиции, collector для сбора результатов из множества источников с разными контрактами.

Что это / Зачем

Promise.all хорош, когда все источники — Promise. На практике приходят коллбэки, события, потоки. Нужна абстракция, которая принимает данные ОТ ЛЮБОГО контракта (callback-last, Promise, then-able, EventEmitter), собирает их по ключам/счётчику и выдаёт результат, когда условие выполнено.

Compose/pipe — синхронная и асинхронная композиция: цепочка функций, где выход одной = вход следующей.

API / Синтаксис

// Pipe — слева направо, последовательно, async
const pipe = (...fns) => (x) =>
  fns.reduce((acc, fn) => acc.then(fn), Promise.resolve(x));

const loadUser = pipe(
  parseId,        // (req) => id
  fetchUserById,  // async (id) => user
  enrichWithPosts // async (user) => userWithPosts
);

// Compose — справа налево (математическая нотация)
const compose = (...fns) => pipe(...fns.reverse());

// Параллельная композиция (применить независимые функции к одному входу)
const parallel = (...fns) => async (x) =>
  Promise.all(fns.map((fn) => fn(x)));

// Коллектор по ключам
function collectKeys(keys) {
  const data = {};
  let pending = keys.length;
  return new Promise((resolve, reject) => {
    const collector = {};
    for (const key of keys) {
      collector[key] = (err, value) => {
        if (err) return reject(err);
        data[key] = value;
        if (--pending === 0) resolve(data);
      };
    }
    setTimeout( => reject(new Error('Collect timeout')), 10000);
    return collector;
  });
}

// Использование (callback-last источники):
const c = collectKeys(['user', 'config']);
fs.readFile('config.json', 'utf8', c.config);
fetchUser(1, c.user);
const { user, config } = await c.promise;

// Branch — условный шаг в pipe (вместо if внутри функции)
const branch = (predicate, ifTrue, ifFalse) => async (x) =>
  (await predicate(x)) ? ifTrue(x) : ifFalse(x);

Ключевые моменты

  • pipe через .reduce(.then) — самый простой workable вариант для async.
  • Каждая функция в pipe должна быть унарной — принимать ОДИН аргумент, возвращать ОДНО значение/Promise.
  • Pipe эскалирует ошибку наверх — внутри pipe нельзя обработать ошибку шага, нужны декораторы или branch.
  • await сохраняет stack trace — важное преимущество async-pipe над callback-pipe.
  • Коллекторы покрывают 90% случаев "собрать данные из разных источников" — там, где Promise.all не подходит из-за разных контрактов.
  • distinct (запрет перезаписи) в коллекторе работает как Promise.race для именованных слотов.
  • В Metarhia коллекторы — отдельная библиотека (metasync).

Подводные камни

  • Async + ФП почти не используют вместе — высокая гранулярность и pipe заметно усложняют отладку.
  • SRP применим и к pipe — каждая функция должна делать одно. Иначе цепочка превращается в магию.
  • Деструктуризация массивов создаёт итератор (вызывает Symbol.iterator) — может быть дороже, чем кажется.
  • Pipe не обрабатывает ошибки — нужны wrapAsync, async wrappers и явные branch.

🎓 Источники

  • 🎓 [Асинхронная композиция в JavaScript compose, pipe] · 2025-01-24 · YouTube
    • Тезисы:
      • Pipe удобнее compose: левое-направо чтение.
      • Pipe через reduce — лаконично и работает с async.
      • branch — мейнстрим в ФП для условий внутри pipe (Ramda, lodash/fp).
      • wrapAsync приводит разные контракты к единому асинхронному.
      • SRP подходит и для ФП — функции в pipe должны быть атомарны.
    • Цитата:

      «Async + ФП почти не используют вместе, потому что гранулярность повышает сложность кода до уровня, не оправданного выгодой.»

  • 🎓 [Асинхронные коллекторы данных] · 2018-12-18 · YouTube
    • Тезисы:
      • 90% случаев асинхронности — простой коллект из разных контрактов.
      • Коллектор принимает callback-last error-first, Promise, события — единый API на выходе.
      • collect(count) по числу, collect(['key1','key2']) по именам.
      • distinct: true запрещает перезапись (как race для слотов).
      • take(...args) — сахар над collect для одиночных значений.
      • Коллектор сам thenable — можно await collector.
    • Цитата:

      «Коллектор как абстракция — единственный, у кого нет проблемы разных контрактов: всё, что хочет ответить, идёт через единый интерфейс.»

  • 🎓 [Архив 2018 - Часть 14 Коллекторы данных, композиция async-функций] · 2020-01-12 · YouTube
  • 🎓 [Асинхронная композиция функций на JavaScript] · 2019-05-02 · YouTube

См. также