Stream: async iterators
С Node 10+
Readable— этоAsyncIterable. Можно читать поток черезfor awaitвместо подписки наdata/end/error. Контракт проще, ошибки ловятся обычнымtry/catch.
Что это
В Node 9–10 у readable stream появился Symbol.asyncIterator. Под капотом он реализован через те же onReadable и read, но снаружи это выглядит как async iterable. Async iterator — это pull-based стрим: код сам запрашивает следующее значение через next, который возвращает Promise.
const fs = require('node:fs');
async function readByLines(path) {
const stream = fs.createReadStream(path, { encoding: 'utf8' });
for await (const chunk of stream) {
console.log(chunk.length);
}
}
Обработка ошибок
Обычный try/catch вокруг for await ловит ошибки чтения — это сильно проще, чем подписываться на 'error'.
try {
for await (const chunk of stream) {
process(chunk);
}
} catch (err) {
// ошибка из самого стрима или из process(chunk)
}
Свой async iterable
async function* lineReader(stream) {
let buf = '';
for await (const chunk of stream) {
buf += chunk;
const lines = buf.split('\n');
buf = lines.pop(); // последняя — неполная
for (const line of lines) yield line;
}
if (buf) yield buf;
}
for await (const line of lineReader(fs.createReadStream('big.csv'))) {
// обработать строку
}
Async iterator vs Stream
| Async iterator | Stream | |
|---|---|---|
| Модель | pull-based | push + pull |
| API | for await, next |
events: data, end, error |
| Композиция | вложенные генераторы | pipe/pipeline |
| Backpressure | автоматически (await) | через highWaterMark |
| Ошибки | try/catch |
on('error') |
Async iterator проще, но у стримов есть push-based магия (writable, transform, pipe-композиция), которую только iterator-ом не покрыть. Если бы async/await и async iterators существовали в начале Node — стримов могло и не быть.
Подводные камни
for awaitповерх стрима потребляет его: после цикла второй раз не прочитаешьbreak/returnиз цикла автоматически закрывает iterator → стрим закрывается. Это хорошо, но в pipe-цепочках стоит учитывать- Двойная подписка: если поверх стрима повесить и
on('data'), иfor await— поведение неопределённое
🎓 Источник: HTTP Server in Node.js — req, res, sockets, and streams
- 📅 2020-01-16 · YouTube ·
PDR5hcV4a_0 - Тезисы:
- С Node 9–10 readable stream имеет
Symbol.asyncIterator— реализован просто черезonReadable+read for await— синтаксический сахар над while +await nextитератора- Async iterator = pull-based стрим. Часть людей считает, что появись async iterators раньше — стримы не понадобились бы
try/catchвокругfor awaitловит ошибки чтения (намного прощеon('error'))
- С Node 9–10 readable stream имеет
- Цитата:
«Реализовать в них метод
Symbol.asyncIterator, который под капотом просто использовал быonReadableиread, было несложно. Как только в V8 появилась поддержка асинхронных итераторов, это моментально сделали.»
🎓 Источник: Архив 2018 — Часть 8 Типизированные массивы, буферы, итераторы, генераторы
- 📅 2020-01-06 · YouTube ·
bFT7VGFfP7o - Тезисы:
- Async iterator:
nextвозвращает Promise, разрешающийся в{ done, value } - Генератор → итератор:
function*сyield, вызов даёт итерируемый объект, каждыйnextдвигает до следующегоyield yield*делегирует вложенному генератору — эквивалентfor...of + yield- Кооперативная многозадачность:
yield= точка передачи управления - В памяти всегда только одно текущее значение — ленивые вычисления экономят память (но не дёргайте
nextвручную — риск утечки) - Async/await через генераторы: Babel компилирует
async/awaitв генераторы
- Async iterator:
- Цитата:
«У нас ленивые вычисления, поэтому у нас все время в памяти существует только одно текущее значение.»