Stream API: потоки
Stream — абстракция Node.js для работы с данными по частям (chunks) вместо загрузки всего в память, что критично при работе с большими файлами, сетевыми ответами и преобразованиями данных.
Зачем нужно
Чтение 5 GB файла через fs.readFile загрузит весь файл в память. Stream читает файл по кускам (например, по 64 KB), обрабатывает и освобождает память. Это позволяет обрабатывать файлы произвольного размера с постоянным потреблением памяти. Все HTTP-запросы/ответы, файловые операции и TCP-сокеты в Node.js — это потоки.
Где используется
- Стриминг файлов (скачивание видео, CSV-экспорт)
- Обработка больших CSV/JSON файлов построчно
- Сжатие на лету (gzip через
zlib.createGzip) - HTTP ответы при стриминге (chunked transfer)
- Pipe между процессами и сокетами
Основной контент
4 типа потоков
Readable — источник данных (fs.createReadStream, http.IncomingMessage)
Writable — приёмник данных (fs.createWriteStream, http.ServerResponse)
Duplex — и чтение и запись (TCP сокет, zlib)
Transform — Duplex с преобразованием данных (zlib.createGzip)
Readable Stream
const fs = require('fs');
// Читать файл частями
const readable = fs.createReadStream('big-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // размер chunk: 64 KB
});
readable.on('data', (chunk) => {
console.log(`Получено ${chunk.length} символов`);
});
readable.on('end', () => {
console.log('Файл прочитан');
});
readable.on('error', (err) => {
console.error('Ошибка:', err);
});
Writable Stream
const writable = fs.createWriteStream('output.txt');
writable.write('Строка 1\n');
writable.write('Строка 2\n');
writable.end('Последняя строка\n'); // закрыть поток
writable.on('finish', () => console.log('Запись завершена'));
writable.on('error', (err) => console.error(err));
pipe — соединение потоков
// Скопировать файл через pipe
const readable = fs.createReadStream('source.mp4');
const writable = fs.createWriteStream('copy.mp4');
readable.pipe(writable);
writable.on('finish', () => console.log('Копирование завершено'));
// Pipe с gzip сжатием
const zlib = require('zlib');
fs.createReadStream('file.txt')
.pipe(zlib.createGzip)
.pipe(fs.createWriteStream('file.txt.gz'));
stream.pipeline — рекомендуемый способ (с обработкой ошибок)
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Автоматически закрывает все потоки при ошибке
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip,
fs.createWriteStream('output.txt.gz')
);
console.log('Сжатие завершено');
Transform Stream — преобразование данных
const { Transform } = require('stream');
// Преобразовать каждый chunk (uppercase)
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback;
}
});
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));
Стриминг HTTP-ответа
// Стримить файл клиенту без загрузки в память
app.get('/download/:id', async (req, res) => {
const file = await FileService.getById(req.params.id);
const fileStream = fs.createReadStream(file.path);
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Disposition', `attachment; filename="${file.name}"`);
res.setHeader('Content-Length', file.size);
fileStream.on('error', (err) => {
res.status(500).end();
});
fileStream.pipe(res);
});
Readline — чтение по строкам
const readline = require('readline');
// Читать большой CSV построчно
const rl = readline.createInterface({
input: fs.createReadStream('large.csv'),
crlfDelay: Infinity
});
rl.on('line', (line) => {
const [id, name, email] = line.split(',');
// Обработать строку
});
rl.on('close', () => console.log('CSV обработан'));
Частые ошибки
- pipe без обработки ошибок — если Readable бросает ошибку,
pipeне закроет Writable автоматически; использоватьstream.pipeline - Backpressure игнорируется — если Writable не успевает,
writeвозвращаетfalse; нужно остановить чтение черезreadable.pause()/resume - Читать весь поток в память —
let data = ''; stream.on('data', d => data += d)нивелирует преимущества стриминга - Не вызывать
callbackв Transform — поток зависнет навсегда
Связанные темы
- _MOC Node.js
- Stream API -- потоки — обзорная вводная по streams
- Stream -- backpressure
- Stream -- async iterators
- Buffer -- бинарные данные
- Streaming Response
- http
Ресурсы
🎓 Источник: Архив 2018 — Часть 17 Потоки (Streams) в Node.js
- 📅 2020-01-15 · YouTube ·
3ZRkNvs_SaE - Тезисы:
- Стримы — это ленивая обработка данных: читаются только по требованию, можно обрабатывать ещё до полного появления
- В Node одновременно существует 3 версии API стримов, одни и те же классы переключаются между ними по используемым событиям (
onDatavsonReadable) - 4 вида: Readable, Writable, Duplex (TCP socket), Transform (zlib) — Transform возвращает преобразованные те же данные
- Под капотом был переход push-base → pull-base → снова push-base; сейчас рекомендуется
onData - Для строк лучше
setEncoding('utf8')вместо.toString(): внутри StringDecoder буферизует неполный UTF-8 codepoint - Ручная склейка байтов
data += chunk.toString()ломает многобайтовые UTF-8 символы на границе чанков - JS строки в UTF-16: 4-байтовый codepoint = 2 surrogate;
.lengthсчитает codepoint, а не графемы - В Unicode вообще нет понятия «символ» — есть графемы, графемные кластеры, codepoints
object-mode— стрим оперирует не байтами, а произвольными объектами (создаётся наследованием)
- Цитата:
«За время развития ноды было три разных версии стримов со всеми разными интерфейсами. И они есть одновременно в одном и том же модуле, в одних и тех же классах.»
🎓 Источник: HTTP Server in Node.js — req, res, sockets, and streams
- 📅 2020-01-16 · YouTube ·
PDR5hcV4a_0 - Тезисы:
reqиres— это стримы (наследники сокета), один и тот же duplex- Композиция через pipe тянет данные «с конца»: писатель просит у предыдущего, тот у источника
- Backpressure: у каждого стрима
high watermarkиlow watermarkв конструкторе. Буфер > high → пауза, < low → упреждающее чтение - Для
index.htmlстрим не нужен —fs.readFile+ одинres.end()быстрее. Стрим оправдан при преобразовании на лету (gzip) pipeне закрывает события: всё равно вешатьon('end')/on('close')руками- С Node 9–10 readable stream — это async iterable (Symbol.asyncIterator). Под капотом просто
read+onReadable - Async iterator — это pull-based стрим; стримы остались из-за push-based функциональности
fs.promisesоказался по бенчмаркам не медленнее, иногда быстрее callbacks (бенчмарки James Snell)
- Цитата:
«Только тогда, когда самый конечный стрим будет готов записать данные, а у него этих данных не будет, он пытается прочитать их из предыдущего стрима.»
🎓 Источник: Serving Static in Node.js
- 📅 2019-09-23 · YouTube ·
n_AdKIzbpBc - Тезисы:
createReadStreamоткрывается синхронно, но не нуждается в try/catch — ошибки только черезonerror. Лучшеfs.statзаранее- Для маленькой статики
res.end(buffer)из памяти Map-кэша быстрее всего; большие файлы (mp3, mp4) — толькоpipe, иначе блокируется - Готовые буферы можно хранить уже сжатыми (gzip заранее) — отдавать прямо в сокет без сжатия на лету
- Атака path traversal: после
path.joinпроверять что итоговый путь действительно внутриstatic/ - Своя генерация индекса каталога —
new Readable({ read {...} }), паттерн revealing constructor,push(null)закрывает поток
- Цитата:
«У вас createReadStream в fs.promises нет, это нормально. Не все методы из fs имеют аналоги в promises, потому что некоторые реализуют асинхронность не через промисы, а через стримы.»
🎓 Источник: Node.js разбор кода Readable, Writable, pipe, HTTP 206 Partial content
- 📅 2023-06-14 · YouTube ·
_LcWUPAOCCQ - Тезисы:
- Транспорт принимает
data: Buffer | Readable— полиморфно:instanceof Readable→pipe, иначеres.end() - Большой Buffer (10 МБ+) полезно отдавать чанками с
Content-Length— браузер отличит конец от обрыва и покажет процент загрузки - HTTP 206 Partial Content: формат
Range: bytes=100-200, вариантыbytes=1000-(с offset до конца) иbytes=-1000(хвост, какtail) - Невалидные Range стоит игнорировать молча (либо отвечать HTTP-ошибкой) — толерантность к входу
- Заголовок ответа:
Content-Range: bytes <start>-<end>/<size>,size=*если неизвестен
- Транспорт принимает
- Код (полезный):
// Минимальный parse-range const parseRange = (range) => { if (!range || !range.includes('=')) return null; const [start, end] = range.split('=').pop().split('-').map(s => parseInt(s)); if (Number.isNaN(start) && Number.isNaN(end)) return null; if (Number.isNaN(start)) return { tail: end }; // bytes=-1000 return { start, end: Number.isNaN(end) ? null : end }; };