为什么需要 Stream
假设要处理一个 2GB 的日志文件。如果用 fs.readFile 把整个文件读进内存,Node.js 进程会直接崩溃(V8 默认内存上限约 1.5GB)。
Stream 的核心思想:不需要把数据全部加载到内存,而是读一块、处理一块、释放一块。
两个关键优势:
- 内存效率:任意时刻只占用一小块 buffer 的内存
- 时间效率:不必等待所有数据就绪,拿到第一块就可以开始处理
// 错误示范:大文件直接读入内存
const data = fs.readFileSync('/path/to/huge-file.log');
// 正确做法:流式处理
const stream = fs.createReadStream('/path/to/huge-file.log');
stream.on('data', chunk => {
// 每次只处理 64KB(默认 highWaterMark)
process(chunk);
});
四种 Stream 类型
Node.js 中所有 Stream 都是 EventEmitter 的实例,分为四种:
| 类型 | 描述 | 例子 |
|---|---|---|
| Readable | 可读流,数据的来源 | fs.createReadStream, http request |
| Writable | 可写流,数据的去向 | fs.createWriteStream, http response |
| Duplex | 双工流,既可读又可写 | net.Socket, TCP 连接 |
| Transform | 转换流,读入数据经过变换后输出 | zlib.createGzip |
Readable Stream
可读流有两种模式:flowing(数据自动流出)和 paused(需手动调用 read())。
const { Readable } = require('stream');
// 方式一:push 数据
const rs = new Readable();
rs.push('hello ');
rs.push('world\n');
rs.push(null); // null 表示流结束
rs.pipe(process.stdout);
// 方式二:实现 _read 方法(按需产生数据)
class Counter extends Readable {
constructor(options) {
super(options);
this.current = 0;
this.max = 5;
}
_read() {
if (this.current <= this.max) {
this.push(String(this.current++) + '\n');
} else {
this.push(null);
}
}
}
new Counter().pipe(process.stdout);
// 输出:0 1 2 3 4 5
Writable Stream
const { Writable } = require('stream');
class Logger extends Writable {
_write(chunk, encoding, callback) {
const line = chunk.toString().toUpperCase();
console.log(`[LOG] ${line}`);
callback(); // 必须调用,通知流可以写入下一块
}
}
const logger = new Logger();
logger.write('first message\n');
logger.write('second message\n');
logger.end(); // 结束写入
实际使用中,fs.createWriteStream 是最常见的可写流:
const fs = require('fs');
const ws = fs.createWriteStream('output.txt');
ws.write('line 1\n');
ws.write('line 2\n');
ws.end('done\n');
ws.on('finish', () => console.log('写入完成'));
Duplex Stream
双工流同时实现了 Readable 和 Writable,读和写互相独立,各自有独立的缓冲区。
const { Duplex } = require('stream');
class Echo extends Duplex {
constructor(options) {
super(options);
this.buffer = [];
}
_write(chunk, encoding, callback) {
this.buffer.push(chunk);
callback();
}
_read() {
if (this.buffer.length) {
this.push(this.buffer.shift());
} else {
this.push(null);
}
}
}
最典型的 Duplex 例子是 TCP Socket:你可以同时向它读写数据。
Transform Stream
Transform 是一种特殊的 Duplex,输入和输出之间有因果关系——写入的数据经过变换后可以被读出。
const { Transform } = require('stream');
class UpperCase extends Transform {
_transform(chunk, encoding, callback) {
// 第一个参数是 error,第二个是变换后的数据
callback(null, chunk.toString().toUpperCase());
}
}
// 标准输入 → 转大写 → 标准输出
process.stdin
.pipe(new UpperCase())
.pipe(process.stdout);
pipe 管道机制
pipe() 是连接流的标准方式,它自动处理了数据流转和背压控制:
readable.pipe(writable);
// 可以链式调用
readable
.pipe(transformA)
.pipe(transformB)
.pipe(writable);
经典场景——文件复制:
const fs = require('fs');
fs.createReadStream('source.txt')
.pipe(fs.createWriteStream('dest.txt'));
HTTP 服务中返回文件:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// 不要 readFile 后 res.end(data)
// 直接 pipe,内存友好
fs.createReadStream('./big-file.json').pipe(res);
}).listen(3000);
文件压缩(组合多个 Transform):
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
背压(Backpressure)
当可写流处理速度跟不上可读流的产出速度时,就会产生背压。pipe() 内部自动处理了这个问题,但手动写流时需要注意:
const rs = fs.createReadStream('huge-file.dat');
const ws = fs.createWriteStream('output.dat');
rs.on('data', chunk => {
// write 返回 false 表示内部缓冲区已满
const canContinue = ws.write(chunk);
if (!canContinue) {
rs.pause(); // 暂停读取
}
});
ws.on('drain', () => {
rs.resume(); // 缓冲区排空,恢复读取
});
rs.on('end', () => ws.end());
这就是 pipe() 内部做的事情。所以能用 pipe() 就用 pipe(),手动处理容易遗漏边界情况。
小结
- Stream 是 Node.js 处理大量数据的基础设施,核心在于”分块处理”
- 四种类型覆盖了数据流的所有场景:生产、消费、双向、变换
pipe()是连接流的首选方式,自带背压控制- 实际开发中,文件 I/O、网络通信、数据转换都离不开 Stream