Stream API: потоки

Stream — абстракция Node.js для работы с данными по частям (chunks) вместо загрузки всего в память, что критично при работе с большими файлами, сетевыми ответами и преобразованиями данных.

Зачем нужно

Чтение 5 GB файла через fs.readFile загрузит весь файл в память. Stream читает файл по кускам (например, по 64 KB), обрабатывает и освобождает память. Это позволяет обрабатывать файлы произвольного размера с постоянным потреблением памяти. Все HTTP-запросы/ответы, файловые операции и TCP-сокеты в Node.js — это потоки.

Где используется

  • Стриминг файлов (скачивание видео, CSV-экспорт)
  • Обработка больших CSV/JSON файлов построчно
  • Сжатие на лету (gzip через zlib.createGzip)
  • HTTP ответы при стриминге (chunked transfer)
  • Pipe между процессами и сокетами

Основной контент

4 типа потоков

Readable  — источник данных (fs.createReadStream, http.IncomingMessage)
Writable  — приёмник данных (fs.createWriteStream, http.ServerResponse)
Duplex    — и чтение и запись (TCP сокет, zlib)
Transform — Duplex с преобразованием данных (zlib.createGzip)

Readable Stream

const fs = require('fs');

// Читать файл частями
const readable = fs.createReadStream('big-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // размер chunk: 64 KB
});

readable.on('data', (chunk) => {
  console.log(`Получено ${chunk.length} символов`);
});

readable.on('end', () => {
  console.log('Файл прочитан');
});

readable.on('error', (err) => {
  console.error('Ошибка:', err);
});

Writable Stream

const writable = fs.createWriteStream('output.txt');

writable.write('Строка 1\n');
writable.write('Строка 2\n');
writable.end('Последняя строка\n'); // закрыть поток

writable.on('finish', () => console.log('Запись завершена'));
writable.on('error', (err) => console.error(err));

pipe — соединение потоков

// Скопировать файл через pipe
const readable = fs.createReadStream('source.mp4');
const writable = fs.createWriteStream('copy.mp4');

readable.pipe(writable);

writable.on('finish', () => console.log('Копирование завершено'));

// Pipe с gzip сжатием
const zlib = require('zlib');
fs.createReadStream('file.txt')
  .pipe(zlib.createGzip)
  .pipe(fs.createWriteStream('file.txt.gz'));

stream.pipeline — рекомендуемый способ (с обработкой ошибок)

const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Автоматически закрывает все потоки при ошибке
await pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip,
  fs.createWriteStream('output.txt.gz')
);
console.log('Сжатие завершено');

Transform Stream — преобразование данных

const { Transform } = require('stream');

// Преобразовать каждый chunk (uppercase)
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback;
  }
});

fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

Стриминг HTTP-ответа

// Стримить файл клиенту без загрузки в память
app.get('/download/:id', async (req, res) => {
  const file = await FileService.getById(req.params.id);
  const fileStream = fs.createReadStream(file.path);

  res.setHeader('Content-Type', 'application/octet-stream');
  res.setHeader('Content-Disposition', `attachment; filename="${file.name}"`);
  res.setHeader('Content-Length', file.size);

  fileStream.on('error', (err) => {
    res.status(500).end();
  });

  fileStream.pipe(res);
});

Readline — чтение по строкам

const readline = require('readline');

// Читать большой CSV построчно
const rl = readline.createInterface({
  input: fs.createReadStream('large.csv'),
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  const [id, name, email] = line.split(',');
  // Обработать строку
});

rl.on('close', () => console.log('CSV обработан'));

Частые ошибки

  • pipe без обработки ошибок — если Readable бросает ошибку, pipe не закроет Writable автоматически; использовать stream.pipeline
  • Backpressure игнорируется — если Writable не успевает, write возвращает false; нужно остановить чтение через readable.pause() / resume
  • Читать весь поток в памятьlet data = ''; stream.on('data', d => data += d) нивелирует преимущества стриминга
  • Не вызывать callback в Transform — поток зависнет навсегда

Связанные темы

Ресурсы


🎓 Источник: Архив 2018 — Часть 17 Потоки (Streams) в Node.js

  • 📅 2020-01-15 · YouTube · 3ZRkNvs_SaE
  • Тезисы:
    • Стримы — это ленивая обработка данных: читаются только по требованию, можно обрабатывать ещё до полного появления
    • В Node одновременно существует 3 версии API стримов, одни и те же классы переключаются между ними по используемым событиям (onData vs onReadable)
    • 4 вида: Readable, Writable, Duplex (TCP socket), Transform (zlib) — Transform возвращает преобразованные те же данные
    • Под капотом был переход push-base → pull-base → снова push-base; сейчас рекомендуется onData
    • Для строк лучше setEncoding('utf8') вместо .toString(): внутри StringDecoder буферизует неполный UTF-8 codepoint
    • Ручная склейка байтов data += chunk.toString() ломает многобайтовые UTF-8 символы на границе чанков
    • JS строки в UTF-16: 4-байтовый codepoint = 2 surrogate; .length считает codepoint, а не графемы
    • В Unicode вообще нет понятия «символ» — есть графемы, графемные кластеры, codepoints
    • object-mode — стрим оперирует не байтами, а произвольными объектами (создаётся наследованием)
  • Цитата:

    «За время развития ноды было три разных версии стримов со всеми разными интерфейсами. И они есть одновременно в одном и том же модуле, в одних и тех же классах.»


🎓 Источник: HTTP Server in Node.js — req, res, sockets, and streams

  • 📅 2020-01-16 · YouTube · PDR5hcV4a_0
  • Тезисы:
    • req и res — это стримы (наследники сокета), один и тот же duplex
    • Композиция через pipe тянет данные «с конца»: писатель просит у предыдущего, тот у источника
    • Backpressure: у каждого стрима high watermark и low watermark в конструкторе. Буфер > high → пауза, < low → упреждающее чтение
    • Для index.html стрим не нужен — fs.readFile + один res.end() быстрее. Стрим оправдан при преобразовании на лету (gzip)
    • pipe не закрывает события: всё равно вешать on('end')/on('close') руками
    • С Node 9–10 readable stream — это async iterable (Symbol.asyncIterator). Под капотом просто read+onReadable
    • Async iterator — это pull-based стрим; стримы остались из-за push-based функциональности
    • fs.promises оказался по бенчмаркам не медленнее, иногда быстрее callbacks (бенчмарки James Snell)
  • Цитата:

    «Только тогда, когда самый конечный стрим будет готов записать данные, а у него этих данных не будет, он пытается прочитать их из предыдущего стрима.»


🎓 Источник: Serving Static in Node.js

  • 📅 2019-09-23 · YouTube · n_AdKIzbpBc
  • Тезисы:
    • createReadStream открывается синхронно, но не нуждается в try/catch — ошибки только через onerror. Лучше fs.stat заранее
    • Для маленькой статики res.end(buffer) из памяти Map-кэша быстрее всего; большие файлы (mp3, mp4) — только pipe, иначе блокируется
    • Готовые буферы можно хранить уже сжатыми (gzip заранее) — отдавать прямо в сокет без сжатия на лету
    • Атака path traversal: после path.join проверять что итоговый путь действительно внутри static/
    • Своя генерация индекса каталога — new Readable({ read {...} }), паттерн revealing constructor, push(null) закрывает поток
  • Цитата:

    «У вас createReadStream в fs.promises нет, это нормально. Не все методы из fs имеют аналоги в promises, потому что некоторые реализуют асинхронность не через промисы, а через стримы.»


🎓 Источник: Node.js разбор кода Readable, Writable, pipe, HTTP 206 Partial content

  • 📅 2023-06-14 · YouTube · _LcWUPAOCCQ
  • Тезисы:
    • Транспорт принимает data: Buffer | Readable — полиморфно: instanceof Readablepipe, иначе res.end()
    • Большой Buffer (10 МБ+) полезно отдавать чанками с Content-Length — браузер отличит конец от обрыва и покажет процент загрузки
    • HTTP 206 Partial Content: формат Range: bytes=100-200, варианты bytes=1000- (с offset до конца) и bytes=-1000 (хвост, как tail)
    • Невалидные Range стоит игнорировать молча (либо отвечать HTTP-ошибкой) — толерантность к входу
    • Заголовок ответа: Content-Range: bytes <start>-<end>/<size>, size=* если неизвестен
  • Код (полезный):
    // Минимальный parse-range
    const parseRange = (range) => {
      if (!range || !range.includes('=')) return null;
      const [start, end] = range.split('=').pop().split('-').map(s => parseInt(s));
      if (Number.isNaN(start) && Number.isNaN(end)) return null;
      if (Number.isNaN(start)) return { tail: end }; // bytes=-1000
      return { start, end: Number.isNaN(end) ? null : end };
    };