Node.js streams и pipe

Streams Node.js — Readable, Writable, Duplex, Transform — это потоковая обработка данных кусками. Метод .pipe склеивает stream-ы в pipeline. HTTP 206 Partial content + Range header — практический пример. Это не ФП-pipe, но идея та же — композиция через единый контракт.

Категории streams

Тип Семантика Пример
Readable источник данных fs.createReadStream, http.IncomingMessage
Writable приёмник fs.createWriteStream, http.ServerResponse
Duplex оба интерфейса независимо net.Socket
Transform Duplex с преобразованием zlib.createGzip, crypto-stream

.pipe — композиция streams

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt')
  .pipe(zlib.createGzip)
  .pipe(fs.createWriteStream('output.txt.gz'));

Это не pipe из FP, но идея похожа — последовательная обработка через единый контракт.

HTTP 206 Partial Content

Заголовок Range — клиент просит часть файла:

Range: bytes=1000-2000      → байты 1000..2000
Range: bytes=1000-          → от 1000 до конца
Range: bytes=-1000          → последние 1000 байт (tail)

Невалидные варианты (без bytes=, без =, без минуса) — игнорируем.

parseRange компактная реализация

const parseRange = (range) => {
  if (!range || !range.includes('=')) return null;
  const value = range.split('=').pop();
  if (!value || !value.includes('-')) return null;
  const [start, end] = value.split('-').map(s => s === '' ? undefined : parseInt(s));
  // start, end ∈ {undefined, NaN, number}
  if (Number.isNaN(start) || Number.isNaN(end)) return null;
  if (start === undefined && end === undefined) return null;
  if (start === undefined) return { suffix: end };   // tail
  if (end === undefined) return { start, end: Infinity };
  return { start, end };
};

Я делаю ранние return-guard. split по '=', pop второго элемента. parseInt вернёт number или NaN.

ChatGPT vs ручная версия

Я попробовал написать в паре с ChatGPT. Он написал сносно, оно работало, но это были две страницы кода. Моя версия — 10 строк.

LLM избыточен в boilerplate, человек видит инварианты и срезает.

Range Reader: HTTP-сервер

const server = http.createServer((req, res) => {
  const range = parseRange(req.headers.range);
  if (range) {
    const { start, end } = range;
    const stream = fs.createReadStream(filePath, { start, end });
    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${end}/${fileSize}`,
      'Content-Length': end - start + 1,
    });
    stream.pipe(res);
  } else {
    fs.createReadStream(filePath).pipe(res);
  }
});

Async iterator над streams

С Node 10+ Readable stream — async iterable:

const stream = fs.createReadStream('big.log');
for await (const chunk of stream) {
  process(chunk);
}

Это объединяет идею streams с генераторной парадигмой.

Backpressure

.pipe автоматически приостанавливает Readable, когда Writable не успевает обрабатывать. Это критично для больших файлов и медленных приёмников (HDD, сеть).

Без pipe нужно вручную: writable.write(chunk) === false ⇒ readable.pause(), потом writable.on('drain', () => readable.resume).

stream.pipeline

Современная замена цепочки .pipe с правильной обработкой ошибок:

const { pipeline } = require('node:stream/promises');

await pipeline(
  fs.createReadStream('input'),
  zlib.createGzip,
  fs.createWriteStream('output.gz')
);

Streams ≠ FP pipe/compose

FP pipe Node streams pipe
Композиция функций Композиция объектов с интерфейсом
Одно значение → одно Стрим чанков
Синхронно Асинхронно
Чистые функции Side effects (I/O)

Но обе — про последовательное преобразование данных через единый контракт.

Связанные темы

Ресурсы