Node.js 中的流

时间: 2019-11-04阅读: 512标签: node

Node.js 中的流(Stream)是出了名的难用甚至是难以理解。

用 Dominic Tarr 的话来说:“流是 Node 中最好的,也是最容易被误解的想法。”即使是 Redux 的创建者和 react.js 的核心团队成员 Dan Abramov 也害怕 Node 流。

本文将帮助你了解流以及如何使用。不要害怕,你完全可以把它搞清楚!


什么是流?

流是为 Node.js 应用提供动力的基本概念之一。它们是数据处理方法,用于将输入的数据顺序读取或把数据写入输出。

流是一种以有效方式处理读写文件、网络通信或任何类型的端到端信息交换的方式。

流的处理方式非常独特,流不是像传统方式那样将文件一次全部读取到存储器中,而是逐段读取数据块并处理数据的内容,不将其全部保留在内存中。

这种方式使流在处理大量数据时非常强大,例如,文件的大小可能大于可用的内存空间,从而无法将整个文件读入内存进行处理。那是流的用武之地!

既能用流来处理较小的数据块,也可以读取较大的文件。

以 YouTube 或 Netflix 之类的“流媒体”服务为例:这些服务不会让你你立即下载视频和音频文件。取而代之的是,你的浏览器以连续的块流形式接收视频,从而使接收者几乎可以立即开始观看和收听。

但是,流不仅涉及处理媒体和大数据。它们还在代码中赋予了我们“可组合性”的力量。考虑可组合性的设计意味着能够以某种方式组合多个组件以产生相同类型的结果。在 Node.js 中,可以通过流在其他较小的代码段中传递数据,从而组成功能强大的代码段。


为什么会用到流

与其他数据处理方法相比,流基本上具有两个主要优点:

  1. 内存效率:你无需事先把大量数据加载到内存中即可进行处理
  2. 时间效率:得到数据后立即开始处所需的时间大大减少,不必等到整个有效数据全部发送完毕才开始处理


Node.js 中有 4 种类型的流:

  1. 可写:可以向其中写入数据的流。例如,fs.createWriteStream() 使我们可以使用流将数据写入文件。
  2. 可读:可从中读取数据的流。例如:fs.createReadStream() 让我们读取文件的内容。
  3. 双工:可读和可写的流。例如,net.Socket
  4. Transform:可在写入和读取时修改或转换数据。例如在文件压缩的​​情况下,你可以在文件中写入压缩数据,也可以从文件中读取解压缩的数据。

如果你已经使用过 Node.js,则可能遇到过流。例如在基于 Node.js 的 HTTP 服务器中,request 是可读流,而 response 是可写流。你可能用过 fs 模块,该模块可让你用可读和可写文件流。每当使用 Express 时,你都在使用流与客户端进行交互,而且由于 TCP 套接字、TLS栈和其他连接都基于 Node.js,所以在每个可以使用的数据库连接驱动的程序中使用流。


一个实际的例子

如何创建可读流

首先需要可读性流,然后将其初始化。

const Stream = require('stream')
const readableStream = new Stream.Readable()

现在,流已初始化,可以向其发送数据了:

readableStream.push('ping!')
readableStream.push('pong!')

异步迭代器

强烈建议在使用流时配合异步迭代器(async iterator)。根据 Axel Rauschmayer 博士的说法,异步迭代是一种用于异步检索数据容器内容的协议(这意味着当前“任务”可以在检索项目之前被暂停)。另外必须提及的是,流异步迭代器实现使用内部的 readable 事件。

从可读流中读取时,可以使用异步迭代器:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'

也可以用字符串收集可读流的内容:

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

注意,在这种情况下必须使用异步函数,因为我们想返回 Promise。

请切记不要将异步功能与 EventEmitter 混合使用,因为当前在事件处理程序中发出拒绝时,无法捕获拒绝,从而导致难以跟踪错误和内存泄漏。目前的最佳实践是始终将异步函数的内容包装在 try/catch 块中并处理错误,但这很容易出错。 这个 pull request 旨在解决一旦其落在 Node 核心上产生的问题。

要了解有关异步迭代的 Node.js 流的更多信息,请查看这篇很棒的文章

Readable.from():从可迭代对象创建可读流

stream.Readable.from(iterable, [options]) 这是一种实用方法,用于从迭代器中创建可读流,该迭代器保存可迭代对象中包含的数据。可迭代对象可以是同步可迭代对象或异步可迭代对象。参数选项是可选的,除其他作用外,还可以用于指定文本编码。

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

两种读取模式

根据 Streams API,可读流有效地以两种模式之一运行:flowingpaused。可读流可以处于对象模式,无论处于 flowing 模式还是 paused 模式。

  • 流模式下,将自动从底层系统读取数据,并通过 EventEmitter 接口使用事件将其尽快提供给程序。
  • 在 paused 模式下,必须显式调用 stream.read() 方法以从流中读取数据块。

在 flowing 模式中,要从流中读取数据,可以监听数据事件并附加回调。当有大量数据可用时,可读流将发出一个数据事件,并执行你的回调。看下面的代码片段:

var fs = require("fs");
var data = '';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Handle stream events --> data, end, and error
readerStream.on('data', function(chunk) {
   data += chunk;
});

readerStream.on('end',function() {
   console.log(data);
});

readerStream.on('error', function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

函数调用 fs.createReadStream() 给你一个可读流。最初流处于静态状态。一旦你侦听数据事件并附加了回调,它就会开始流动。之后将读取大块数据并将其传递给你的回调。流实现者决定发送数据事件的频率。例如,每当有几 KB 的数据被读取时,HTTP 请求就可能发出一个数据事件。当从文件中读取数据时,你可能会决定读取一行后就发出数据事件。

当没有更多数据要读取(结束)时,流将发出结束事件。在以上代码段中,我们监听此事件以在结束时得到通知。

另外,如果有错误,流将发出并通知错误。

在 paused 模式下,你只需在流实例上重复调用 read(),直到读完所有数据块为止,如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.on('readable', function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end', function() {
    console.log(data)
});

read() 函数从内部缓冲区读取一些数据并将其返回。当没有内容可读取时返回 null。所以在 while 循环中,我们检查是否为 null 并终止循环。请注意,当可以从流中读取大量数据时,将会发出可读事件。

所有 Readable 流均以 paused 模式开始,但可以通过以下方式之一切换为 flowing 模式

  • 添加一个 'data' 事件处理。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法将数据发送到可写对象。

Readable 可以使以下方法之一切换回 paused 模式:

  • 如果没有管道目标,则通过调用 stream.pause() 方法。
  • 如果有管道目标,请删除所有管道目标。可以通过调用 stream.unpipe() 方法来删除多个管道目标。

一个需要记住的重要概念是,除非提供了一种用于消耗或忽略该数据的机制,否则 Readable 将不会生成数据。如果使用机制被禁用或取消,则 Readable 将会试图停止生成数据。添加 readable 事件处理会自动使流停止 flowing,并通过 read.read() 得到数据。如果删除了 readable 事件处理,那么如果存在 'data' 事件处理,则流将再次开始 flowing。


如何创建可写流

要将数据写入可写流,你需要在流实例上调用 write()。如以下示例所示:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
    writableStream.write(chunk);
});

上面的代码很简单。它只是简单地从输入流中读取数据块,并使用 write() 写入目的地。该函数返回一个布尔值,指示操作是否成功。如果为 true,则写入成功,你可以继续写入更多数据。如果返回 false,则表示出了点问题,你目前无法写任何内容。可写流将通过发出 drain 事件来通知你什么时候可以开始写入更多数据。

调用 writable.end() 方法表示没有更多数据将被写入 Writable。如果提供,则可选的回调函数将作为 finish 事件的侦听器附加。

// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!

你可以用可写流从可读流中读取数据:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping!')
readableStream.push('pong!')

writableStream.end()

还可以用异步迭代器来写入可写流,建议使用

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但是可以通过 util.promisify() 转换为基于 Promise 的版本(A行)。

在此例中,使用以下两种模式:

Writing to a writable stream while handling backpressure (line B):
在处理 backpressure 时写入可写流(B行):

if (!writable.write(chunk)) {
  await once(writable, 'drain');
}

关闭可写流,并等待写入完成(C行):

writable.end();
await finished(writable);

pipeline()

管道是一种机制,可以将一个流的输出作为另一流的输入。它通常用于从一个流中获取数据并将该流的输出传递到另一个流。管道操作没有限制。换句话说,管道可用于分多个步骤处理流数据。

在 Node 10.x 中引入了 stream.pipeline()。这是一种模块方法,用于在流转发错误和正确清理之间进行管道传输,并在管道完成后提供回调。

这是使用管道的例子:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 可以轻松将一系列流
// 通过管道传输在一起,并在管道完全完成后得到通知。
// 一个有效地用 gzip压缩巨大视频文件的管道:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

由于pipe 不安全,应使用 pipeline 代替 pipe。


流模块

Node.js 流模块 提供了构建所有流 API 的基础。

Stream 模块是 Node.js 中默认提供的原生模块。 Stream 是 EventEmitter 类的实例,该类在 Node 中异步处理事件。因此流本质上是基于事件的。

要访问流模块:

const stream = require('stream');

stream 模块对于创建新型流实例非常有用。通常不需要使用 stream 模块来消耗流。


流驱动的 Node API

由于它们的优点,许多 Node.js 核心模块提供了原生流处理功能,最值得注意的是:

  • net.Socket 是流所基于的主 API 节点,它是以下大多数 API 的基础
  • process.stdin 返回连接到 stdin 的流
  • process.stdout 返回连接到 stdout 的流
  • process.stderr 返回连接到 stderr 的流
  • fs.createReadStream() 创建一个可读的文件流
  • fs.createWriteStream() 创建可写的文件流
  • net.connect() 启动基于流的连接
  • http.request() 返回 http.ClientRequest 类的实例,它是可写流
  • zlib.createGzip() 使用gzip(一种压缩算法)将数据压缩到流中
  • zlib.createGunzip() 解压缩 gzip 流。
  • zlib.createDeflate() deflate(压缩算法)将数据压缩到流中
  • zlib.createInflate() 解压缩一个deflate流

以下是与可写流相关的一些重要事件:

  • error –表示在写或配置管道时发生了错误。
  • pipeline – 当把可读流传递到可写流中时,该事件由可写流发出。
  • unpipe – 当你在可读流上调用 unpipe 并停止将其输送到目标流中时发出。


结论

这就是所有关于流的基础知识。流、管道和链是 Node.js 的核心和最强大的功能。流确实可以帮你编写简洁而高效的代码来执行 I/O。

另外,还有一个值得期待的 Node.js 战略计划,称为 BOB,旨在改善 Node.js 的内部数据流以及希望作为未来 Node.js 流数据接口的公共 API 的。


站长推荐

1.云服务推荐: 国内主流云服务商,各类云产品的最新活动,优惠券领取。地址:阿里云腾讯云华为云

2.广告联盟: 整理了目前主流的广告联盟平台,如果你有流量,可以作为参考选择适合你的平台点击进入

链接: http://www.fly63.com/article/detial/6710

关闭

我怎样用Node.js自动完成工作的

我们在工作中经常会进行很多繁琐的任务:更新配置文件,复制和粘贴文件,更新 Jira 标签等。慢慢的花在这些任务上的时间会越来越多。reskin 的定义是使用相同的游戏机制,屏幕和元素的定位

Node 最古老的 npm 包 request 将被废弃

相信 Node.js 开发者对 Request 都不会陌生,这是一个 Node.js 模块,以 npm 包的形式提供,是一个简单的 HTTP 客户端,通过它可方便地实现 HTTP 请求。可以看到,request 的最新版本是 2.88.0

nodejs适合做什么?

面对一个新技术,多问几个为什么总是好的。既然 PHP、Python、Java 都可以用来进行后端开发,那么为什么还要去学习 Node.js?Node.js适合做什么?Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境,一个让 JavaScript 运行在服务端的开发平台。

node简单的上传图片

首先:要做到服务器获取到你上传的文件,配置好koabody。(其实还有很多关于上传的中间件,因为本来就使用了koabody的中间件就直接用这个就好了)

深入解析Node.js Event Loop各阶段

今天小编就为大家分享一篇关于Node.js Event Loop各阶段讲解,写的十分的全面细致,具有一定的参考价值,对此有需要的朋友可以参考学习下。timer阶段处理setTimeout于setInterval回调,开始处理的时机与poll阶段有关联。

Node.js特点和适用场景

Node是由Ryan Dahl创造出来的,Ryan Dahl是一名资深的C/C++程序员,在创造出Node之前,他的主要工作都是围绕高性能web服务器来展开的,他找到了设计高性能web服务器的几个要点

Node.js FS模块方法速查

所有文件操作提供同步和异步的两种方式,本笔记只记录异步的API,异步方式其最后一个参数是回调函数。回调函数的第一个参数往往是错误对象,如果没有发生参数,那么第一个参数可能是null或者undefinded。

在 Node.js 中看 JavaScript 的引用

早期学习 Node.js 的时候 (2011-2012),有挺多是从 PHP 转过来的,当时有部分人对于 Node.js 编辑完代码需要重启一下表示麻烦(PHP不需要这个过程),于是社区里的朋友就开始提倡使用 node-supervisor 这个模块来启动项目

YodaOS:一个属于 Node.js 社区的操作系统

YodaOS 的架构跟 Android 类似,YodaOS 是 Rokid 研发的首个全栈开源 AI 操作系统,由 Linux 内核、硬件抽象层、AI 抽象层和 JavaScript 框架组成,专为下一代人机交互设备开发,可用于智能音箱、智能家居、智能穿戴和车载等多种设备和场景。

Node.js创建服务器和模拟客户端请求

服务器是某种长期运行,等待请求资源的应用程序,Web服务器是使用HTTP协议,等待客户端连接后请求资源的驻守应用程序;HTTP协议是应用层的协议,在传输层依然是使用TCP或者UDP协议

点击更多...

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!