Node.js中的Stream

更新日期: 2021-01-25阅读: 1.5k标签: node

Stream主要用于序列化地数据处理(read or write input into output sequentially),比如文件读写,网络数据传输, 或任何端到端的数据交换。Stream在处理数据的时候,与传统方式有所不同,传统方式是把数据作为一个整体进行处理,而stream则是把数据分割成一块一块的进行处理,它不是整个数据一起处理,而是一块数据一块数据地处理。以文件读写为例,文件读写的时候,stream并不是一次性地把一个文件中的所有内容都读取到内存中再进行处理(就是再写入到另外一个文件中),而是一块数据一块数据的进行读取,读取完一块数据就处理一块数据(把这块数据写入到另外一个文件中),而不会让它一直在内存中。相比于传统方式,使用stream来处理数据,可以高效的使用内存,更有可能来处理大文件。再以网络数据传输(网上看视频)为例。我们并不是把整个电影都从服务器上下载下来才开始播放,而是一块一块地下载,下载一块,播放一块。服务器一块一块地写数据,浏览器一块一块的读数据。用流处理数据,时间上也比较高效。

在Node.js中,有以下4种流

输入流(或可读流 readable stream),就是流中有数据,我们从里面读取数据。输入流负责读取数据,我们只需要从输入流中读取数据。
输出流:它负责向目的地写入数据,而我们只需要向输出流中写入数据。
双向流: 即可以从它里面读取数据,也可以向它里面写入数据
转换流,给它一个流,它把流里面的内容转换一下,然后再把流输出,流的性质是不变的,流的内容发生了变化,通常转化的是输入流
输入流和输出流是相对于计算机内存而言的,输入流,就是把数据读取到内存中,输出流,则是把内存中的数据写入到目的地。
我们既可以使用Node.js提供的流,比如,fs.createReadStream 就创建了读取文件的输入流,也可以创建自己的流。通常来说,都是先使用Node.js提供的流,那就使用fs模块提供的输入,输出流。fs.createReadStream() 创建输入流,输入流负责读取数据,所以它接受一个必须的参数,要读取的数据。fs.createReadStream('./data.txt') 就是从当前文件下的data.txt中读取数据。我们程序需要做的就是从输入流中读取想要的数据,在Node.js中从输入流中读取数据,也有多种方式。这要从输入流的两种模式说起。输入流有两种模式,pause模式,flow模式,pause模式是默认模式,就是创建输入流后,它处于暂停状态,程序不会从输入流中读取数据,我们需要手动地从输入流中读取数据。flow模式,则是程序自动地从输入流中读取数据,我们只需要决定读取到数据后再怎么处理数据就可以了。这时,flow模式,可能是我们想要的。怎么从pause模式切换到flow模式呢?给输入流注册data事件,流就自动转化成flow模式,那再data事件注册一个事件处理函数,处理数据,就可以实现从输入流中读取数据了。
创建一个文件夹,新建一个data.txt文件和read.js 文件,data.txt文件写一些文字。read.js文件如下
const fs = require('fs');
const readable = fs.createReadStream('./data.txt');
readable.on('data', data => {
    console.log(data);
})

node read.js 执行程序,控制台输出了buffer, 就是一些二进制数据,默认情况下,从输入流中读取到的内容都是buffer,我们需要手动转化成字符串,调用toString()方法就可以了。console.log(data.toString()) 。假设想用pause模式来读取数据,那就要手动调用输入流的read() 方法了,但还要注意,只有在输入流中有数据的时候,才能调用read() 方法,所以要在readable 事件处理函数中调用read()事件, read()方法,如果读取不到数据,就会返回null

const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
readableStream.on('readable', () => {
    let chunk;
    while (null !== (chunk = readableStream.read())) {
        console.log(chunk.toString());
    }
})

除了使用事件的方式来从输入流中读取数据,还可以使用异步迭代器(for await ... of )来消费输入流(从输入流中读取数据),因为输入流是异步可迭代对象

const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
async function logChunks(readable) {
    for await (const chunk of readable) {
        console.log(chunk.toString());
    }
}
logChunks(readableStream);

当然它的底层还是监听readable 事件。除了在异步迭代器,直接处理数据,也可以把流中的数据暂时存储起来,以便日后消费

const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');

readableStream.setEncoding('utf-8');

async function readableToString(readable) {
    let result = '';
    for await (const chunk of readable) {
        result += chunk;
    }
    return result;
}
readableToString(readableStream).then(console.log);

如果要处理异常,可以用try/catch 把for await 的处理包起来。说完了输入流,再说输出流。fs.createWriteStream 创建一个输出流,输出流,就是把数据输出到什么地方,因此,它也接受一个参数,就是输出的目的地。我们要做的就是向输出流中写放数据,要调用write() 方法

const fs = require('fs');
const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');

readableStream.on('data', data => {
    writeStream.write(data);
})
write() 返回true or false, true表示写入成功,你可以继续写入数据。false则是,写入出错了,你不能继续写入了。至于什么时候能再写,输出流触发drain事件。
只要输出流触发了drain事件,就证明,可以继续向输出流中写入数据了。所以真正安全的做法,向输出流写入数据的时候,还要判断true or false, 并监听 drain 事件。
const fs = require('fs');
const util = require('util');
const stream = require('stream');
const {once} = require('events');

const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');

const finished = util.promisify(stream.finished);

async function writeIterableToFile(readable, writable) {
    for await (const chunk of readable) {
        if (!writable.write(chunk)) {
            await once(writable, 'drain');
        }
    }
    writable.end();
    // Wait until done. Throws if there are errors.
    await finished(writable);
}

writeIterableToFile(readableStream, writeStream)

end() 方法,就表示,向输出流中写完数据了,不会再写了。finished 事件,则中输出流,把所有的数据都写入到的目的地中。当我们手动去读取和写入文件时候,处理有点麻烦,这就用到pipe()方法。上面的可以写成

const fs = require('fs');

const readableStream = fs.createReadStream('./data.txt');
const writeStream = fs.createWriteStream('./result.txt');

readableStream.pipe(writeStream);

pipe() 操作,前一个的输出变成后面一个的输入。readableStream输入流的输出,就是读取到的数据,我们就是要把这些数据写入到输出流中,所以它正好是输出流的输入,因此,就可以用pipe把这两个链接起来。pipe()的操作就相当于

readableStream.on("data", chunk => {
    // 自动处理了drain事件。
    writeStream.write(chunk);
});

readableStream.on("end", () => {
    writeStream.end();
});

pipe() 也是把输入流的模式转换成了flow模式。

现在可以创建自已的输入,输出流了。还是有多种方法。先看输入流的创建,输入流中永远都是存储数据,要不然,我们没有办法从其里面读取数据。

1,直接创建一个Stream.Reable()对象,然后向里面push 数据。push(null) 表示不再向输入流中push数据

const Stream = require('stream');
const readableStream = new Stream.Readable();
readableStream.push('ping');
readableStream.push('pong!');
readableStream.push(null);

当向输入流中push 数据的时候,它都是放到内存的buffer中,如果没有消费掉这些数据,内存就会占满,push不进去了。

2, Readable.from() 从一个可迭代对象中创建一个输入流。

async function* gen() {
    yield 'hello';
    yield 'stream';
}
const readableFromIterator = Stream.Readable.from(gen());

3,实现 _read()方法。_read() 方法,就是表示,把要读取的数据放到输入流中,不要和前面的read()搞混了,read() 是从输入流读取数据到程序中处理。_read() 则是把要读取的数据放入到输入流中,这样我们才能调用read()方法来从里面消费数据. _read() 是node.js 自己调用的,也可以看到输入流,其实是要我们要读取的数据的一个抽象。我们要读取哪一个文件的数据,就创建哪一个文件的输入流,可以把输入流想像读取的源文件。怎么把数据放入到输入流中,还是调用push方法。在_read()方法中调用push()方法。

const stream = require('stream');
const data = require('./result'); // json 数据

class JsonDataReadable extends stream.Readable {
    readIndex = 0;
    _read(size) { // 读取的默认大小
        let okToSend = true;
        while (okToSend) {
            okToSend = this.push(data.text.substr(this.readIndex, size));
            this.readIndex += size;

            if (this.readIndex >data.text.length) {
                this.push(null);
                okToSend = false;
            }
        }
    }
}
消费这个readable, 可以在这个js文件同级目录下,建立一个result.json 方件。
{
    "text": "The return value of the pipe() method is the destination stream, which is a very convenient thing that lets us chain multiple pipe() calls, like this:"
}

然后在这个js 文件写

const fs = require('fs');
const Reader = new JsonDataReadable();
const fileWriter = fs.createWriteStream('output.txt');
Reader.pipe(fileWriter);

创建输出流: 是向目的地写入数据,当使用目的地创建一个输出流的时候,它就会自动地向目的地写数据,写数据调用的是_write() 方法。只要输出流中有数据,它才会向目的地写数据,要把内存中的数据写入到输出流中,那就调用write()方法。write()方法,是将内存中的数据写入到输出流中。应该是存储到buffer中,流中的数据写入到目的地,就是把buffer中的数据输出到目的地就中_write()方法。_write()方法,是内部的接口,表示怎么把数据写入到目的地。显示_write()方法

const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString()); // 写到控制台上
    next(); // 表示写入成功
}

writableStream.write('hello, ');
writableStream.end('world!');

链接: https://www.fly63.com/article/detial/10141

关于 Node.js 里 ES6 Modules 的一次更新说明

关于 Node.js 里 ES6 Modules 的一次更新说明,总结来说:CommonJS 与 ES6 Modules 之间的关键不同在于代码什么时候知道一个模块的结构和使用它。

用node.js开发一个可交互的命令行应用

在这个教程中,我们会开发一个命令行应用,它可以接收一个 CSV 格式的用户信息文件,教程的内容大纲:“Hello,World”,处理命令行参数,运行时的用户输入,异步网络会话,美化控制台的输出,封装成 shell 命令,JavaScript 之外

Node启动https服务器

首先你需要生成https证书,可以去付费的网站购买或者找一些免费的网站,可能会是key或者crt或者pem结尾的。不同格式之间可以通过OpenSSL转换

nodejs 异步转同步

nodej项目在微信环境开发,nodejs的异步特效,会导致请求没有完成就执行下面的代码,出现错误。经过多方查找,可以使用async模块来异步转同步,只有前一个function执行callback,下一个才会执行。

基于node服务器的大文件(G级)上传

3G的大文件分1500个2M二进度文件,通post方法发送给node服务,服务器全部接收到文件后,进组装生成你上文件。

为什么要把 JavaScript 放到服务器端上运行?

JavaScript比C的开发门槛要低,尽管服务器端JavaScript存在已经很多年了,但是后端部分一直没有市场,JavaScript在浏览器中有广泛的事件驱动方面的应用,考虑到高性能、符合事件驱动、没有历史包袱这3个主要原因,JavaScript成为了Node的实现语言。

了解node.js事件循环

node.js的第一个基本论点是I / O的性能消耗是很昂贵。因此,使用当前编程技术的最大浪费来自于等待I / O完成。有几种方法可以处理性能影响

Node.js 应用:Koa2 使用 JWT 进行鉴权

在前后端分离的开发中,通过 Restful API 进行数据交互时,如果没有对 API 进行保护,那么别人就可以很容易地获取并调用这些 API 进行操作。那么服务器端要如何进行鉴权呢?

Node.js 前端开发指南

我们经常跟Node.js打交道,即使你是一名前端开发人员 -- npm脚本,webpack配置,gulp任务,程序打包 或 运行测试等。即使你真的不需要深入理解这些任务,但有时候你会感到困惑,会因为缺少Node.js的一些核心概念而以非常奇怪的方式来编码。

happypack提升项目构建速度

运行在 Node.js 之上的 Webpack 是单线程模型的,也就是说 Webpack 需要处理的任务需要一件件挨着做,不能多个事情一起做。happypack把任务分解给多个子进程去并发的执行,子进程处理完后再把结果发送给主进程。

点击更多...

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!