加载中...
Node.js TypeScript#4. Readable Stream(可读流)
发表于:2023-04-06 | 分类: 后端

在 Nodejs 中,Stream是非常重要的,我们可以通过Stream来高效的读写数据。比如在处理文件或者处理 http 请求时。在这篇文章中,我们主要讲的是readable stream(可读流)

Readable Streams

Stream是一种用于处理无法被一次性完全获取的数据集合。因为有了这种方式,数据不需要一次性全部存在内存中,这使得处理大量数据的时候变得高效。除此之外,你可以在只有部分数据可用时开始处理数据,而不是等待整个数据可用。

在我们之前的例子中,我们通过下面的方式读取文件:

import * as fs from "fs";

async function readFile() {
  try {
    const content = await fs.promises.readFile("./file.txt");
    console.log(content instanceof Buffer); // true
    console.log(content.toString());
  } catch (err) {
    console.log(err);
  }
}
readFile();

上面的代码并不高效,因为它会等待整个文件加载到内存中然后再执行操作。而 Nodejs 为我们提供了fs.createReadableStreamAPI 来编写可读流的操作。

每一个stream都是EventEmitter的一个实例。通过EventEmitter我们可以监听到数据是否读取到。

import * as fs from "fs";

const stream = fs.createReadStream("./file.txt");
stream.on("data", (chunk) => {
  console.log("New chunk of data:", chunk);
});

在上面的方法中,文件被存储在了内部的buffer中。根据createReadStream的第二个参数有一个highWaterMark属性(这里默认是 64kib)。当触发data事件后,会根据它的限制来读取文件,如果文件大于它的阈值,那么就会被分割为多个chunk

New chunk of data: <Buffer 68 61 61 61 61 61>

如上,每个chunk就是一个buffer实例。你的文件越大,接收到的chunk就越多。

如果想要将buffer转为字符串有以下几种方法:

  1. buffer.toString()
const stream = fs.createReadStream("./file.txt");
stream.on("data", (chunk) => {
  console.log("New chunk of data:", chunk.toString());
});
  1. 使用StringDecoder
import { StringDecoder } from "string_decoder";

const decoder = new StringDecoder("utf-8");
const stream = fs.createReadStream("./file.txt");

stream.on("data", (chunk) => {
  return console.log("New chunk of data:", decoder.write(chunk as Buffer));
});
  1. 还有一种就是通过在createReadStream中明确定义字符编码
const stream = fs.createReadStream("./file.txt", { encoding: "utf-8" });

关于流的两个模式

在上面的示例中,我们通过在data事件上添加监听器,使stream开始发出chunk

那么如果我们在创建stream后一段时间再添加回调函数,结果是什么呢?

我们仍然可以监听到数据。

想要更好的理解它,我们需要去看一下可读流的模式。readable stream有两个模式:

  • paused
  • flowing

我们可以看下面的例子:

const stream = fs.createReadStream("./file.txt");
setTimeout(() => {
  stream.on("data", (chunk) => {
    console.log(chunk);
  });
}, 2000);

我们首先创建了一个可读流,然后在setTimeout中延迟读取流。但是最终还是可以正常的读取数据,这是因为可读流可写流都会将文件存储在了内部的buffer中。且所有stream默认都是使用paused模式。我们需要通过添加一个data事件的监听器来自动切换流的模式到flowing。当切换为flowing模式时,才开始读取数据,所以数据并不会因为延迟调用而丢失。

还有一种手动将readable stream切换到flowing模式的方法是调用 stream.resume 方法。

stream.resume();

setTimeout(() => {
  stream.on("data", (data) => {
    console.log(data);
  });
}, 2000);

上面这个例子中,我们是先手动修改模式,不过我们并没有及时的处理。所以当延迟执行后,stream已经丢失了。最终导致上面这个例子什么都不会输出

Readable stream的原理是什么

在使用 fs.createReadableStream 熟悉可读流之后,让我们创建自己的可读流以更好地说明其工作原理。

import { Readable } from "stream";

const stream = new Readable();

stream.push("Hello");
stream.push("World!");
stream.push(null);

stream.on("data", (chunk) => {
  console.log(chunk.toString());
});

push 方法会将数据添加到内部buffer中,以供用户使用。最后push(null)表示流已经完成数据输入。

上面的例子会输出

Hello
World!

Readable实例的read方法和readable事件

readable.read() 是可读流中用于手动触发读取数据的方法。当可读流处于flowing模式时,数据会自动从底层系统读取到内存中,并被放入buffer中,供用户使用。但是,在某些情况下,你可能需要手动控制数据的读取速度,这时候就可以使用 readable.read() 方法手动触发读取。

import { Readable } from "stream";

const stream = new Readable();

const read = stream.read.bind(stream);
stream.read = function () {
  console.log("read() called");
  return read(2);
};

stream.push("Hello");
stream.push("World!");
stream.push(null);

stream.on("data", (chunk) => {
  console.log(chunk.toString());
});

当我们运行上面代码后,我们可以看到当我们开始读取流时,read函数被多次调用。并且,每次只输出两个字节。

read() called
He
read() called
ll
read() called
oW
read() called
or
read() called
ld
read() called
!
read() called
read() called
read() called
read() called

我们也可以使用readable.on('readable') 来读流是否有数据可供读取

import { Readable } from "stream";

const stream = new Readable();

stream.push("Hello");
stream.push("World!");
stream.push(null);

stream.on("readable", () => {
  let data;
  // 使用一个循环,以确保我们读取所有当前可用的数据
  while (null !== (data = stream.read())) {
    console.log("Received:", data.toString());
  }
});
stream.on("end", () => {
  console.log("Reached end of stream.");
});

在上面的例子中,当可读流被读取时,会先在 read 方法中推送数据到队列中,然后当调用 stream.read() 时,会从buffer中读取数据。readable.on('readable') 用于监听可读流是否有新数据可供读取,当可读流中有新数据时,会触发回调函数,循环读取buffer中的数据,直到数据为空为止。

总结

在本文中,我们介绍了流是什么以及如何使用它们。虽然在本系列文章的这一部分中,我们着重讨论了可读流,但在接下来的部分中,我们将涵盖可写流、管道等更多内容。

上一篇:
Node.js TypeScript#3. Buffer
下一篇:
Node.js TypeScript#1. Modules, process参数, 文件系统基础
本文目录
本文目录