Streaming Response

Streaming Response — техника отправки HTTP-ответа по частям (chunks) по мере готовности данных, не ожидая формирования всего ответа, что позволяет клиенту начать обработку немедленно.

Зачем нужно

При генерации большого ответа (CSV-экспорт, PDF, длинный JSON) клиент ждёт полной готовности, а сервер держит данные в памяти. Streaming позволяет отправлять данные по мере генерации: снижает TTFB (time to first byte), уменьшает потребление памяти и позволяет обрабатывать бесконечные потоки данных. Также используется для Server-Sent Events и AI-стриминга (ChatGPT-подобные ответы).

Где используется

  • Экспорт больших CSV/Excel файлов
  • Стриминг медиафайлов (видео, аудио)
  • Server-Sent Events (SSE) — real-time уведомления
  • AI-стриминг (OpenAI streaming API, Claude)
  • Длинные JSON-ответы (списки с тысячами записей)

Основной контент

Базовый стриминг файла

const fs = require('fs');
const path = require('path');

app.get('/download/report', async (req, res) => {
  const filePath = path.join(__dirname, 'reports', 'big-report.csv');

  // Установить заголовки до начала стриминга
  res.setHeader('Content-Type', 'text/csv');
  res.setHeader('Content-Disposition', 'attachment; filename="report.csv"');
  res.setHeader('Transfer-Encoding', 'chunked');

  // pipe автоматически завершит res по окончании файла
  const fileStream = fs.createReadStream(filePath);
  fileStream.on('error', () => res.status(500).end());
  fileStream.pipe(res);
});

Генерация CSV на лету

app.get('/export/users', authMiddleware, async (req, res) => {
  res.setHeader('Content-Type', 'text/csv; charset=utf-8');
  res.setHeader('Content-Disposition', 'attachment; filename="users.csv"');

  // Заголовок CSV
  res.write('id,name,email,created_at\n');

  // Читать из БД курсором (не загружать всё в память)
  const cursor = db('users').select('*').stream;

  cursor.on('data', (row) => {
    res.write(`${row.id},${row.name},${row.email},${row.created_at}\n`);
  });

  cursor.on('end', () => res.end());
  cursor.on('error', (err) => {
    console.error(err);
    res.end();
  });
});

Server-Sent Events (SSE)

SSE — однонаправленный канал server → client поверх HTTP, альтернатива WebSocket для push-уведомлений.

app.get('/events', (req, res) => {
  // Заголовки SSE
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.flushHeaders; // отправить заголовки немедленно

  // Отправить событие
  const sendEvent = (data, event = 'message') => {
    res.write(`event: ${event}\n`);
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  };

  // Отправлять обновления каждые 5 секунд
  const interval = setInterval(() => {
    sendEvent({ time: new Date, status: 'ok' }, 'heartbeat');
  }, 5000);

  // Подписка на события приложения
  const unsubscribe = eventBus.on('order:created', (order) => {
    sendEvent(order, 'order');
  });

  // Очистка при закрытии соединения
  req.on('close', () => {
    clearInterval(interval);
    unsubscribe;
    res.end();
  });
});
// Клиент (браузер)
const eventSource = new EventSource('/events');
eventSource.addEventListener('order', (e) => {
  const order = JSON.parse(e.data);
  updateUI(order);
});
eventSource.addEventListener('heartbeat', (e) => console.log('ping'));

AI-стриминг (OpenAI-подобный)

app.post('/chat', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');

  const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: req.body.messages,
    stream: true
  });

  for await (const chunk of stream) {
    const delta = chunk.choices[0]?.delta?.content;
    if (delta) {
      res.write(`data: ${JSON.stringify({ text: delta })}\n\n`);
    }
  }

  res.write('data: [DONE]\n\n');
  res.end();
});

Видео стриминг с Range запросами

app.get('/video/:id', (req, res) => {
  const videoPath = path.join(__dirname, 'videos', `${req.params.id}.mp4`);
  const stat = fs.statSync(videoPath);
  const fileSize = stat.size;
  const range = req.headers.range;

  if (range) {
    const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
    const chunkEnd = end || Math.min(start + 10 ** 6, fileSize - 1); // 1MB chunk

    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${chunkEnd}/${fileSize}`,
      'Accept-Ranges': 'bytes',
      'Content-Length': chunkEnd - start + 1,
      'Content-Type': 'video/mp4'
    });

    fs.createReadStream(videoPath, { start, end: chunkEnd }).pipe(res);
  } else {
    res.writeHead(200, { 'Content-Length': fileSize, 'Content-Type': 'video/mp4' });
    fs.createReadStream(videoPath).pipe(res);
  }
});

Частые ошибки

  • Установить заголовки после начала стримингаres.setHeader после первого res.write бросает ошибку
  • Не очищать ресурсы при req.on('close') — интервалы и подписки продолжают работать после отключения клиента
  • Буферизация nginx — по умолчанию nginx буферизует ответ; для SSE добавить proxy_buffering off
  • SSE вместо WebSocket — SSE одностороннее; если нужна двусторонняя связь — WebSocket

Связанные темы

Ресурсы