Skip to content
imshengli blog
Go back

Node.js Stream:四种流类型、pipe 管道与背压机制

Node.js Stream 完整教程:四种类型(Readable/Writable/Duplex/Transform)、pipe 管道与背压机制。

· 5 min

为什么需要 Stream

假设要处理一个 2GB 的日志文件。如果用 fs.readFile 把整个文件读进内存,Node.js 进程会直接崩溃(V8 默认内存上限约 1.5GB)。

Stream 的核心思想:不需要把数据全部加载到内存,而是读一块、处理一块、释放一块

两个关键优势:

// 错误示范:大文件直接读入内存
const data = fs.readFileSync('/path/to/huge-file.log');

// 正确做法:流式处理
const stream = fs.createReadStream('/path/to/huge-file.log');
stream.on('data', chunk => {
  // 每次只处理 64KB(默认 highWaterMark)
  process(chunk);
});

四种 Stream 类型

Node.js 中所有 Stream 都是 EventEmitter 的实例,分为四种:

类型描述例子
Readable可读流,数据的来源fs.createReadStream, http request
Writable可写流,数据的去向fs.createWriteStream, http response
Duplex双工流,既可读又可写net.Socket, TCP 连接
Transform转换流,读入数据经过变换后输出zlib.createGzip

Readable Stream

可读流有两种模式:flowing(数据自动流出)和 paused(需手动调用 read())。

const { Readable } = require('stream');

// 方式一:push 数据
const rs = new Readable();
rs.push('hello ');
rs.push('world\n');
rs.push(null); // null 表示流结束

rs.pipe(process.stdout);

// 方式二:实现 _read 方法(按需产生数据)
class Counter extends Readable {
  constructor(options) {
    super(options);
    this.current = 0;
    this.max = 5;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null);
    }
  }
}

new Counter().pipe(process.stdout);
// 输出:0 1 2 3 4 5

Writable Stream

const { Writable } = require('stream');

class Logger extends Writable {
  _write(chunk, encoding, callback) {
    const line = chunk.toString().toUpperCase();
    console.log(`[LOG] ${line}`);
    callback(); // 必须调用,通知流可以写入下一块
  }
}

const logger = new Logger();
logger.write('first message\n');
logger.write('second message\n');
logger.end(); // 结束写入

实际使用中,fs.createWriteStream 是最常见的可写流:

const fs = require('fs');
const ws = fs.createWriteStream('output.txt');

ws.write('line 1\n');
ws.write('line 2\n');
ws.end('done\n');

ws.on('finish', () => console.log('写入完成'));

Duplex Stream

双工流同时实现了 Readable 和 Writable,读和写互相独立,各自有独立的缓冲区。

const { Duplex } = require('stream');

class Echo extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = [];
  }

  _write(chunk, encoding, callback) {
    this.buffer.push(chunk);
    callback();
  }

  _read() {
    if (this.buffer.length) {
      this.push(this.buffer.shift());
    } else {
      this.push(null);
    }
  }
}

最典型的 Duplex 例子是 TCP Socket:你可以同时向它读写数据。

Transform Stream

Transform 是一种特殊的 Duplex,输入和输出之间有因果关系——写入的数据经过变换后可以被读出。

const { Transform } = require('stream');

class UpperCase extends Transform {
  _transform(chunk, encoding, callback) {
    // 第一个参数是 error,第二个是变换后的数据
    callback(null, chunk.toString().toUpperCase());
  }
}

// 标准输入 → 转大写 → 标准输出
process.stdin
  .pipe(new UpperCase())
  .pipe(process.stdout);

pipe 管道机制

pipe() 是连接流的标准方式,它自动处理了数据流转和背压控制:

readable.pipe(writable);

// 可以链式调用
readable
  .pipe(transformA)
  .pipe(transformB)
  .pipe(writable);

经典场景——文件复制:

const fs = require('fs');

fs.createReadStream('source.txt')
  .pipe(fs.createWriteStream('dest.txt'));

HTTP 服务中返回文件:

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  // 不要 readFile 后 res.end(data)
  // 直接 pipe,内存友好
  fs.createReadStream('./big-file.json').pipe(res);
}).listen(3000);

文件压缩(组合多个 Transform):

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));

背压(Backpressure)

当可写流处理速度跟不上可读流的产出速度时,就会产生背压。pipe() 内部自动处理了这个问题,但手动写流时需要注意:

const rs = fs.createReadStream('huge-file.dat');
const ws = fs.createWriteStream('output.dat');

rs.on('data', chunk => {
  // write 返回 false 表示内部缓冲区已满
  const canContinue = ws.write(chunk);
  if (!canContinue) {
    rs.pause(); // 暂停读取
  }
});

ws.on('drain', () => {
  rs.resume(); // 缓冲区排空,恢复读取
});

rs.on('end', () => ws.end());

这就是 pipe() 内部做的事情。所以能用 pipe() 就用 pipe(),手动处理容易遗漏边界情况。

小结


Share this post on:

Previous Post
构建有效的工作上下文,让 AI 参与决策
Next Post
深入理解 JavaScript 闭包:原理、陷阱与实战