Streaming Response
Streaming Response — техника отправки HTTP-ответа по частям (chunks) по мере готовности данных, не ожидая формирования всего ответа, что позволяет клиенту начать обработку немедленно.
Зачем нужно
При генерации большого ответа (CSV-экспорт, PDF, длинный JSON) клиент ждёт полной готовности, а сервер держит данные в памяти. Streaming позволяет отправлять данные по мере генерации: снижает TTFB (time to first byte), уменьшает потребление памяти и позволяет обрабатывать бесконечные потоки данных. Также используется для Server-Sent Events и AI-стриминга (ChatGPT-подобные ответы).
Где используется
- Экспорт больших CSV/Excel файлов
- Стриминг медиафайлов (видео, аудио)
- Server-Sent Events (SSE) — real-time уведомления
- AI-стриминг (OpenAI streaming API, Claude)
- Длинные JSON-ответы (списки с тысячами записей)
Основной контент
Базовый стриминг файла
const fs = require('fs');
const path = require('path');
app.get('/download/report', async (req, res) => {
const filePath = path.join(__dirname, 'reports', 'big-report.csv');
// Установить заголовки до начала стриминга
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', 'attachment; filename="report.csv"');
res.setHeader('Transfer-Encoding', 'chunked');
// pipe автоматически завершит res по окончании файла
const fileStream = fs.createReadStream(filePath);
fileStream.on('error', () => res.status(500).end());
fileStream.pipe(res);
});
Генерация CSV на лету
app.get('/export/users', authMiddleware, async (req, res) => {
res.setHeader('Content-Type', 'text/csv; charset=utf-8');
res.setHeader('Content-Disposition', 'attachment; filename="users.csv"');
// Заголовок CSV
res.write('id,name,email,created_at\n');
// Читать из БД курсором (не загружать всё в память)
const cursor = db('users').select('*').stream;
cursor.on('data', (row) => {
res.write(`${row.id},${row.name},${row.email},${row.created_at}\n`);
});
cursor.on('end', () => res.end());
cursor.on('error', (err) => {
console.error(err);
res.end();
});
});
Server-Sent Events (SSE)
SSE — однонаправленный канал server → client поверх HTTP, альтернатива WebSocket для push-уведомлений.
app.get('/events', (req, res) => {
// Заголовки SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders; // отправить заголовки немедленно
// Отправить событие
const sendEvent = (data, event = 'message') => {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// Отправлять обновления каждые 5 секунд
const interval = setInterval(() => {
sendEvent({ time: new Date, status: 'ok' }, 'heartbeat');
}, 5000);
// Подписка на события приложения
const unsubscribe = eventBus.on('order:created', (order) => {
sendEvent(order, 'order');
});
// Очистка при закрытии соединения
req.on('close', () => {
clearInterval(interval);
unsubscribe;
res.end();
});
});
// Клиент (браузер)
const eventSource = new EventSource('/events');
eventSource.addEventListener('order', (e) => {
const order = JSON.parse(e.data);
updateUI(order);
});
eventSource.addEventListener('heartbeat', (e) => console.log('ping'));
AI-стриминг (OpenAI-подобный)
app.post('/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: req.body.messages,
stream: true
});
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
res.write(`data: ${JSON.stringify({ text: delta })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
});
Видео стриминг с Range запросами
app.get('/video/:id', (req, res) => {
const videoPath = path.join(__dirname, 'videos', `${req.params.id}.mp4`);
const stat = fs.statSync(videoPath);
const fileSize = stat.size;
const range = req.headers.range;
if (range) {
const [start, end] = range.replace(/bytes=/, '').split('-').map(Number);
const chunkEnd = end || Math.min(start + 10 ** 6, fileSize - 1); // 1MB chunk
res.writeHead(206, {
'Content-Range': `bytes ${start}-${chunkEnd}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunkEnd - start + 1,
'Content-Type': 'video/mp4'
});
fs.createReadStream(videoPath, { start, end: chunkEnd }).pipe(res);
} else {
res.writeHead(200, { 'Content-Length': fileSize, 'Content-Type': 'video/mp4' });
fs.createReadStream(videoPath).pipe(res);
}
});
Частые ошибки
- Установить заголовки после начала стриминга —
res.setHeaderпосле первогоres.writeбросает ошибку - Не очищать ресурсы при
req.on('close')— интервалы и подписки продолжают работать после отключения клиента - Буферизация nginx — по умолчанию nginx буферизует ответ; для SSE добавить
proxy_buffering off - SSE вместо WebSocket — SSE одностороннее; если нужна двусторонняя связь — WebSocket