Node.js streams и pipe
Streams Node.js — Readable, Writable, Duplex, Transform — это потоковая обработка данных кусками. Метод
.pipeсклеивает stream-ы в pipeline. HTTP 206 Partial content + Range header — практический пример. Это не ФП-pipe, но идея та же — композиция через единый контракт.
Категории streams
| Тип | Семантика | Пример |
|---|---|---|
| Readable | источник данных | fs.createReadStream, http.IncomingMessage |
| Writable | приёмник | fs.createWriteStream, http.ServerResponse |
| Duplex | оба интерфейса независимо | net.Socket |
| Transform | Duplex с преобразованием | zlib.createGzip, crypto-stream |
.pipe — композиция streams
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt')
.pipe(zlib.createGzip)
.pipe(fs.createWriteStream('output.txt.gz'));
Это не pipe из FP, но идея похожа — последовательная обработка через единый контракт.
HTTP 206 Partial Content
Заголовок Range — клиент просит часть файла:
Range: bytes=1000-2000 → байты 1000..2000
Range: bytes=1000- → от 1000 до конца
Range: bytes=-1000 → последние 1000 байт (tail)
Невалидные варианты (без bytes=, без =, без минуса) — игнорируем.
parseRange компактная реализация
const parseRange = (range) => {
if (!range || !range.includes('=')) return null;
const value = range.split('=').pop();
if (!value || !value.includes('-')) return null;
const [start, end] = value.split('-').map(s => s === '' ? undefined : parseInt(s));
// start, end ∈ {undefined, NaN, number}
if (Number.isNaN(start) || Number.isNaN(end)) return null;
if (start === undefined && end === undefined) return null;
if (start === undefined) return { suffix: end }; // tail
if (end === undefined) return { start, end: Infinity };
return { start, end };
};
Я делаю ранние return-guard. split по '=', pop второго элемента. parseInt вернёт number или NaN.
ChatGPT vs ручная версия
Я попробовал написать в паре с ChatGPT. Он написал сносно, оно работало, но это были две страницы кода. Моя версия — 10 строк.
LLM избыточен в boilerplate, человек видит инварианты и срезает.
Range Reader: HTTP-сервер
const server = http.createServer((req, res) => {
const range = parseRange(req.headers.range);
if (range) {
const { start, end } = range;
const stream = fs.createReadStream(filePath, { start, end });
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Content-Length': end - start + 1,
});
stream.pipe(res);
} else {
fs.createReadStream(filePath).pipe(res);
}
});
Async iterator над streams
С Node 10+ Readable stream — async iterable:
const stream = fs.createReadStream('big.log');
for await (const chunk of stream) {
process(chunk);
}
Это объединяет идею streams с генераторной парадигмой.
Backpressure
.pipe автоматически приостанавливает Readable, когда Writable не успевает обрабатывать. Это критично для больших файлов и медленных приёмников (HDD, сеть).
Без pipe нужно вручную: writable.write(chunk) === false ⇒ readable.pause(), потом writable.on('drain', () => readable.resume).
stream.pipeline
Современная замена цепочки .pipe с правильной обработкой ошибок:
const { pipeline } = require('node:stream/promises');
await pipeline(
fs.createReadStream('input'),
zlib.createGzip,
fs.createWriteStream('output.gz')
);
Streams ≠ FP pipe/compose
| FP pipe | Node streams pipe |
|---|---|
| Композиция функций | Композиция объектов с интерфейсом |
| Одно значение → одно | Стрим чанков |
| Синхронно | Асинхронно |
| Чистые функции | Side effects (I/O) |
Но обе — про последовательное преобразование данных через единый контракт.
Связанные темы
- Композиция функций (pipe, compose)
- Iterator Pattern
- Итераторы и протокол итерации
- Асинхронная композиция функций
Ресурсы
- Лекция: _LcWUPAOCCQ
- Node Streams: https://nodejs.org/api/stream.html