📕
余烬的小册
数据结构与算法GitHub
  • 总述
  • 经验记录
    • 经验总结
      • web component
      • 前端性能优化总结与分析
      • 我的长列表优化方案
      • 双向通讯解决方案
      • 🔧基于istanbul实现代码测试覆盖率工具
      • 表单系统(低代码表单)
      • 跨端小程序
      • 设计一个即时聊天功能
      • 跨页面通讯 3658699fe4cb4d0bbe22b0881390bacd
    • 踩坑记录
      • HTML踩坑记录
      • Flutter踩坑记录
      • CSS踩坑记录
  • 源码解析
    • Vue源码解析
      • Vue2源码解析系列-响应式原理
      • Vue2源码解析系列-模板编译
      • Vue2源码解析系列-渲染系统(待更新)
        • Patch
      • Vue2源码解析系列-调度系统(todo)
      • Vue2组件更新流程(todo)
      • 如何学习Vue源码
      • Vue3源码解析系列-响应系统
      • Vue3源码解析系列-渲染系统
      • Vue3源码解析系列-组件化和渲染优化(todo)
      • Vue router源码解析(todo)
    • React源码解析(todo)
    • 微前端
      • qiankun源码解析(todo)
    • Vite源码解析
      • Vite Client源码
      • Vite Server源码(todo)
  • 前端技术
    • javaScript
      • ES6
        • 变量声明
        • 模块化
        • 箭头函数
        • 你不知道的for...of
        • 新的数据结构Set和Map
        • JavaScript异步编程终极解决方案
        • ES6 Class 3a0c0a225a534984aabe9a943c5df975
      • JavaScript Error
      • JavaScript浅拷贝和深拷贝
      • JavaScript闭包
      • JavaScript最佳实践
      • JavaScript设计模式
      • async函数的polyfill
    • 深入理解JavaScript系列
      • JavaScript中的继承
      • JavaScript原始类型和引用类型
      • JavaScript浅拷贝和深拷贝
      • JavaScript手写系列
      • JavaScript之this
      • 词法环境和环境记录
      • JavaScript内存泄漏
      • 执行上下文
      • 从ECMAScript规范中学习this
    • TypeScript
      • TypeScript基础教程
      • Typescript高级操作
      • TypeScript工具类型
      • Typescript手写实现工具类型
      • Typescript总结(思维导图)
    • 浏览器原理
      • 页面渲染原理
      • 浏览器存储
      • JavaScript事件循环
      • 事件循环
      • 跨域
      • DOM事件流
      • 从输入url到页面渲染
      • 判断节点之间的关系及根据节点关系查找节点
      • history API
    • 跨端技术
      • Flutter
        • Flutter布局组件
    • 前端工程化
      • Babel插件开发指南
      • 循环依赖
      • pm2
    • React
      • React 状态管理
      • React组件通讯
      • Redux入门
      • Flux
      • React Hook(todo)
      • Effect
  • 服务器端
    • 计算机网络
      • 应用层
      • 运输层
      • 物理层
      • 数据链路层
      • HTTP缓存
      • HTTPS
      • 网络层
    • NodeJs
      • Node.js
      • nodejs最佳实践
      • 《深入浅出Nodejs》小结
      • mongoose填充(populate)
      • node事件循环
      • Node子进程
      • nestjs从零开始
      • nodejs流
      • Nodejs调试
      • Koa源码解析
    • 服务器
      • 操作系统
      • Linux
      • nginx常用指令
      • nginx常用配置
    • 数据库
      • Mysql常见语法
      • MongoDB Indexes索引
  • 前端安全与性能优化
    • 前端安全
      • 跨站脚本攻击(XSS)
      • 跨站点请求伪造(CSRF)
      • 点击劫持
      • 中间人攻击
      • 越权攻击与JWT
    • 前端性能优化
      • 前端监控系统
      • 前端性能优化总结与分析 7348bba0918645b1899006dc842a64c1
      • 衡量性能的核心指标 0dc15ef127cf4f4a9f1137c377420292
      • 图片懒加载
  • 杂项
    • 其他
      • Git
      • web component框架
      • 实现滚动框的懒加载
      • Stencil指南
    • CSS
      • 定位和层叠上下文
      • BFC
      • 盒模型
      • css选择器
      • css变量
由 GitBook 提供支持
在本页
  • 什么是流
  • 流的类型
  • 可读流
  • 读取大文件
  • 两种模式
  • 常用事件
  • 可写流
  • 写入大文件
  • 事件
  • 双工流和转换流
  • 流的API
  • pipe()
  • unpipe()
  • destroy()
  • _construct()
  • push()
  • 实现自定义流
  • 实现流的方式
  • 实现可读流
  • 实现可写流
  • 实现双工流
  • 实现转换流
  • 参考
在GitHub上编辑
  1. 服务器端
  2. NodeJs

nodejs流

tags: Nodejs Created time: June 25, 2022 11:49 AM emoji: https://nodejs.org/static/images/favicons/favicon.png

什么是流

流是nodejs中很重要的一个概念,是用于在 Node.js 中处理流数据的抽象接口,它能以一种以高效的方式处理读/写文件、网络通信、或任何类型的端到端的信息交换。

在传统的方式中,处理文件时会将文件的所有内容读取到内存中,然后再处理,假如遇到大文件,这种处理方式不仅低效还可能影响服务器性能,而流能解决这类问题,流每次都读取一部分内容到内存中处理,而不是一次性读取所有内容,这在处理大文件时很有用。

流的类型

Node.js 中有四种基本的流类型:

  • [Writable](http://nodejs.cn/api/stream.html#class-streamwritable): 可以写入数据的流(例如,[fs.createWriteStream()](http://nodejs.cn/api/fs.html#fscreatewritestreampath-options))。

  • [Readable](http://nodejs.cn/api/stream.html#class-streamreadable): 可以从中读取数据的流(例如,[fs.createReadStream()](http://nodejs.cn/api/fs.html#fscreatereadstreampath-options))。

  • [Duplex](http://nodejs.cn/api/stream.html#class-streamduplex): Readable 和 Writable 的流(例如,[net.Socket](http://nodejs.cn/api/net.html#class-netsocket))。

  • [Transform](http://nodejs.cn/api/stream.html#class-streamtransform): 可以在写入和读取数据时修改或转换数据的 Duplex 流(例如,[zlib.createDeflate()](http://nodejs.cn/api/zlib.html#zlibcreatedeflateoptions))。

许多 Node.js 核心模块提供了原生的流处理功能,最值得注意的有:

  • process.stdin 返回连接到 stdin 的流。

  • process.stdout 返回连接到 stdout 的流。

  • process.stderr 返回连接到 stderr 的流。

  • fs.createReadStream() 创建文件的可读流。

  • fs.createWriteStream() 创建到文件的可写流。

  • net.connect() 启动基于流的连接。

  • http.request() 返回 http.ClientRequest 类的实例,该实例是可写流。

  • zlib.createGzip() 使用 gzip(压缩算法)将数据压缩到流中。

  • zlib.createGunzip() 解压缩 gzip 流。

  • zlib.createDeflate() 使用 deflate(压缩算法)将数据压缩到流中。

  • zlib.createInflate() 解压缩 deflate 流。

可读流

读取大文件

以读取文件为例

const reader = fs.createReadStream("./data.txt");

这段代码通过fs.createReadStream()创建了一个可读流,然后将data.txt的内容片段化地读取出来。

两种模式

可读流有两种模式:流动和暂停。

当创建reader时,可读流处于暂停pause模式,此时不会进行读取操作,但可以通过以下方式之一切换到流动模式:

  • 添加 ['data'](http://nodejs.cn/api/stream.html#event-data) 事件句柄。

  • 调用 [stream.resume()](http://nodejs.cn/api/stream.html#readableresume) 方法。

  • 调用 [stream.pipe()](http://nodejs.cn/api/stream.html#readablepipedestination-options) 方法将数据发送到 [Writable](http://nodejs.cn/api/stream.html#class-streamwritable)。

当可读流处于流动模式时,可以通过reader.pause()或者reader.unpipe()来转换成暂停模式。

// 自动转成流动模式,此时会将内容打印到屏幕上
reader.pipe(process.stdout);

// 如果执行下面命令,则会使reader流转成暂停模式
reader.pause();
// 或者 reader.unpipe(process.stdout);

常用事件

可读流提供一些事件,其中最值得注意的有

  • error

  • data

reader.on("data", (chunk) => {
    console.log(chunk.toString());
}
// 类似于
// reader.pipe(process.stdout);

可写流

写入大文件

下面我们实现一个大文件的写入操作

const reader = fs.createReadStream("./data.txt");
const writer = fs.createWriteStream("./data2.txt");
reader.pipe(writer);

这段代码首先创造可读流读取了data.txt(假设是一个超大的文件),然后将可读流通过管道流入可写流中。

这种是将可读流的内容流入到可写流中,当然你也可以直接写入可写流。

const writer = fs.createWriteStream("./data2.txt");
writer.write("hello world\n");
writer.write("hello node\n");
writer.end("end");

事件

可写流最值得关心的事件有

  • error 在写入或管道数据时发生错误时触发

  • drain 'drain' 事件将在适合继续将数据写入流时触发。

  • finish 在调用 [stream.end()](http://nodejs.cn/api/stream.html#writableendchunk-encoding-callback) 方法之后,并且所有数据都已刷新到底层系统时触发

const reader = fs.createReadStream(data1);
const writer = fs.createWriteStream("./data2.txt");
writer.on("drain", () => {
    // 每次写入chunk时触发
    // 如果内容体积太小,可以一次性写入,则不会触发该事件
    console.log("drain");
});
writer.on("finish", () => {
    // 最后一次写入chunk时触发
    console.log("finish");
});
reader.pipe(writer);

双工流和转换流

双工流是实现Readable和Writable的流,实际上就是实现_read和_write的类,既能当做可读流又能当做可写流。

转换流则是在双工流的基础上增加了transform方法,转换流会将输入经过transform后转换成输出。

Nodejs中的的压缩流和加密流都载内部实现了转换流。下面将以压缩流来演示转换流的工作方式。

const zlib = require("zlib");
const fs = require("fs");
fs.createReadStream("./data.txt")
    .pipe(zlib.createGzip())
    .pipe(fs.createWriteStream("./test.txt.gz"));

通过fs来创建了可读流和可写流,目的是将data.txt内容读取出来然后保存到test.txt.gz文件中,而在流的流动过程中会经历转换流zlib.createGzip(),它的作用是将可读流的内容进行转换,并提供给后面的可写流。

流的API

pipe()

readable.pipe() 方法将 [Writable](http://nodejs.cn/api/stream.html#class-streamwritable) 流绑定到 readable,使其自动切换到流动模式并将其所有数据推送到绑定的 [Writable](http://nodejs.cn/api/stream.html#class-streamwritable)。 数据流将被自动管理,以便目标 Writable 流不会被更快的 Readable 流漫过。

注意事项

如果 Readable 流在处理过程中触发错误,则 Writable 目标不会自动关闭。 如果发生错误,则需要手动关闭每个流以防止内存泄漏(通过this.destroy())。

unpipe()

当在 [Readable](http://nodejs.cn/api/stream.html#class-streamreadable) 流上调用 [stream.unpipe()](http://nodejs.cn/api/stream.html#readableunpipedestination) 方法时,则会触发 'unpipe' 事件,从其目标集合中删除此 [Writable](http://nodejs.cn/api/stream.html#class-streamwritable)。

reader.pipe(writer);
reader.unpipe(writer);

destroy()

在流中触发的异常,并不会传播到error事件中,也不会自动关闭释放资源,但出现异常时,需要手动执行this.destory(err)来销毁流。

销毁流 可选地触发 'error' 事件,并且触发 'close' 事件(除非 emitClose 设置为 false)。 在此调用之后,可读流将释放任何内部资源,随后对 push() 的调用将被忽略。

所有流中出现的异常都必须调用this.destroy进行捕获,否则可能会导致内存泄露

const myReadable = new Readable({
	read(size) {
		fs.open("./xxx", (err) => {
			if (err) {
				this.destroy(err);
			}
		});
	},
});
myReadable.on("error", (err) => {
	console.log("err", err);
});
myReadable.on("close", () => {
	console.log("close");
});
myReadable.pipe(process.stdout);

_construct()

_construct是流的一个内部方法,是可选的。

_construct方法接收一个参数callback,当实现了_construct方法时,_write()、_final()、_read() 和 _destroy() 等操作的调用都会被延迟,直到执行了callback。

callback可以传递一个Error类型的参数。

const myWritable = new Writable({
	construct(callback) {
		try {
			this.fd = fs.openSync("./input.txt", "a+");
			callback();
		} catch (err) {
			callback(err);
		}
	},
	write(chunk, encoding, callback) {
		fs.write(this.fd, chunk, (err) => {
			if (err) this.destroy(err);
			callback();
		});
	},
});
fs.createReadStream("./data.txt").pipe(myWritable);

push()

Readable上的方法,添加chunk到可读流中,可选指定encoding。

const myReadable = new Readable({
	read(size) {},
});
myReadable.push("123\n");
myReadable.push("123\n");
myReadable.push(null);
myReadable.push("123"); // 报错,不能在EOF后push新的数据
myReadable.pipe(process.stdout);

当 chunk 作为 null 传递时,表示流结束 (EOF),之后不能再写入数据。

实现自定义流

除了使用nodejs内置的流外,我们还可以自己实现流。

实现流的方式

实现流有三种方式,分别是通过类、构造函数或对象模式来实现流。

const {Readable} = require('stream')
// 类
class MyReadable extends Readable{
    constructor(options){
        super(options)
    }
	_read(){}
}
// 构造函数
const myReadable = new Readable()
myReadable._read = function(){}
// 对象模式
const myReadable = new Readable({
    read(){}
})

下面将统一采用对象模式或类的形式实现流。

实现可读流

实现可读流必须要实现_read()方法,以从底层资源中获取数据

const myReadable = new Readable({
    read(size){}
})

调用 [readable.read()](http://nodejs.cn/api/stream.html#readable_readsize) 时,如果资源中的数据可用,则实现应开始使用 [this.push(dataChunk)](http://nodejs.cn/api/stream.html#readablepushchunk-encoding) 方法将该数据推送到读取队列中。 一旦流准备好接受更多数据,则 _read() 将在每次调用 [this.push(dataChunk)](http://nodejs.cn/api/stream.html#readablepushchunk-encoding) 后再次调用。 _read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。 只有当 _read() 停止后再次被调用时,它才能继续将额外的数据推入队列。

size参数是可选的,它的作用是确定每次push的数据量。

实现可写流

和可读流类似,可写流必须实现_write()方法 或/和 writev()。

const myWritable = new Writable({
	write(chunk, encoding, callback) {
		console.log(chunk.toString());
	},
});
process.stdin.pipe(myWritable);

writable会持续传入片段数据(chunk)并执行writable.write()。 如果流实现能够同时处理多个数据块,则应实现 writable._writev() 方法。

encoding表示chunk的编码,如果块是字符串,则 encoding 是该字符串的字符编码。 如果块是 Buffer,或者如果流在对象模式下运行,则可以忽略 encoding。

如果需要监听 ['drain'](http://nodejs.cn/api/stream.html#event-drain) 事件,则必须执行callback函数,按照nodejs风格,第一个参数传Error,如果没有异常则传null。

实现类似fs.createWriteStream的效果。

const myWritable = new Writable({
	write(chunk, encoding, callback) {
		fs.writeFile("./input.txt", chunk, (err) => {
			callback(err, chunk);
		});
	},
});
process.stdin.pipe(myWritable);

实现双工流

双工流实际上就是实现readabale和writable的流,因此我们需要实现duplex._read和duplex._write。

const { Duplex } = require("stream");
class MyDuplex extends Duplex {
	_read(chunk) {}
	_write(chunk) {
		this.push(chunk);
	}
}
const myDuplex = new MyDuplex();
process.stdin.pipe(myDuplex).pipe(process.stdout);

process.stdin获取输入流,然后通过pipe流到myDuplex中,此时myDuplex是作为可写流,在这个过程中MyDuplex._write会被多次调用来片段化地写入数据,我们在这里通过this.push()将片段数据chunk添加,然后myDuplex就可以作为可读流来讲内容流入到可写流process.stdout,即屏幕上。

实现转换流

转换流在双工流的基础上实现了transform,它能够转换内容,例如前面讲过的压缩流。

const zlib = require("zlib");
const fs = require("fs");
fs.createReadStream("./data.txt")
	.pipe(zlib.createGzip()) // 转换
	.pipe(fs.createWriteStream("./test.txt.gz"));

我们可以实现一个简单的转换流,将文本每一行内容的前面都加上行号

let count = 0;
const myTransform = new Transform({
	transform(chunk) {
		this.push(`[${count}] ` + chunk);
	},
});
fs.createReadStream("./data.txt").pipe(myTransform).pipe(process.stdout);

参考

上一页nestjs从零开始下一页Nodejs调试

stream 流 | Node.js API 文档 (nodejs.cn)
Node.js 流 (nodejs.cn)
Node's Streams (jscomplete.com)