Stream: async iterators

С Node 10+ Readable — это AsyncIterable. Можно читать поток через for await вместо подписки на data/end/error. Контракт проще, ошибки ловятся обычным try/catch.

Что это

В Node 9–10 у readable stream появился Symbol.asyncIterator. Под капотом он реализован через те же onReadable и read, но снаружи это выглядит как async iterable. Async iterator — это pull-based стрим: код сам запрашивает следующее значение через next, который возвращает Promise.

const fs = require('node:fs');

async function readByLines(path) {
  const stream = fs.createReadStream(path, { encoding: 'utf8' });
  for await (const chunk of stream) {
    console.log(chunk.length);
  }
}

Обработка ошибок

Обычный try/catch вокруг for await ловит ошибки чтения — это сильно проще, чем подписываться на 'error'.

try {
  for await (const chunk of stream) {
    process(chunk);
  }
} catch (err) {
  // ошибка из самого стрима или из process(chunk)
}

Свой async iterable

async function* lineReader(stream) {
  let buf = '';
  for await (const chunk of stream) {
    buf += chunk;
    const lines = buf.split('\n');
    buf = lines.pop(); // последняя — неполная
    for (const line of lines) yield line;
  }
  if (buf) yield buf;
}

for await (const line of lineReader(fs.createReadStream('big.csv'))) {
  // обработать строку
}

Async iterator vs Stream

Async iterator Stream
Модель pull-based push + pull
API for await, next events: data, end, error
Композиция вложенные генераторы pipe/pipeline
Backpressure автоматически (await) через highWaterMark
Ошибки try/catch on('error')

Async iterator проще, но у стримов есть push-based магия (writable, transform, pipe-композиция), которую только iterator-ом не покрыть. Если бы async/await и async iterators существовали в начале Node — стримов могло и не быть.

Подводные камни

  • for await поверх стрима потребляет его: после цикла второй раз не прочитаешь
  • break/return из цикла автоматически закрывает iterator → стрим закрывается. Это хорошо, но в pipe-цепочках стоит учитывать
  • Двойная подписка: если поверх стрима повесить и on('data'), и for await — поведение неопределённое

🎓 Источник: HTTP Server in Node.js — req, res, sockets, and streams

  • 📅 2020-01-16 · YouTube · PDR5hcV4a_0
  • Тезисы:
    • С Node 9–10 readable stream имеет Symbol.asyncIterator — реализован просто через onReadable + read
    • for await — синтаксический сахар над while + await next итератора
    • Async iterator = pull-based стрим. Часть людей считает, что появись async iterators раньше — стримы не понадобились бы
    • try/catch вокруг for await ловит ошибки чтения (намного проще on('error'))
  • Цитата:

    «Реализовать в них метод Symbol.asyncIterator, который под капотом просто использовал бы onReadable и read, было несложно. Как только в V8 появилась поддержка асинхронных итераторов, это моментально сделали.»

🎓 Источник: Архив 2018 — Часть 8 Типизированные массивы, буферы, итераторы, генераторы

  • 📅 2020-01-06 · YouTube · bFT7VGFfP7o
  • Тезисы:
    • Async iterator: next возвращает Promise, разрешающийся в { done, value }
    • Генератор → итератор: function* с yield, вызов даёт итерируемый объект, каждый next двигает до следующего yield
    • yield* делегирует вложенному генератору — эквивалент for...of + yield
    • Кооперативная многозадачность: yield = точка передачи управления
    • В памяти всегда только одно текущее значение — ленивые вычисления экономят память (но не дёргайте next вручную — риск утечки)
    • Async/await через генераторы: Babel компилирует async/await в генераторы
  • Цитата:

    «У нас ленивые вычисления, поэтому у нас все время в памяти существует только одно текущее значение.»

См. также