前言
本文是对于
Node.js
核心模块fs
可读流createReadeStream
的应用,实现 “行读取器”,功能为读取一个文档的内容,每读完一行触发一次监听的事件,并对这一行数据进行处理。
推荐阅读文章:
LineReader 类的创建
实现 “行读取器” 的整体思路是创建一个类的实例,然后在这个实例上监听一个事件,并开始读取文件,每次读完一行触发,我们这里将这个类命名为 LineReader
,因为类需要监听事件,所以需要继承 EventEmitter
。
/* 行读取器 LineReader 类 */
// 引入依赖
const EventEmitter = require('events');
const fs = require('fs');
// 行读取器的类,参数为读取文件的路径
class LineReader extends EventEmitter {
contructor(path) {
super();
this.path = path; // 文件路径
this._rs = fs.createReadStream(this.path); // 创建可读流
this.current = null; // 存储每次读到的单个字节
this.buffers = []; // 存放文件每一行单个字节 Buffer 的数组
this.system = null; // 默认的系统(windows 或 mac)
this.RETURN = 13; // \r 的十六进制数
this.LINE = 10; // \n 的十六进制数
// 监听 newListener
this.on('newListener', readLineCallback.bind(this));
}
}
在 LineReader
实例上定义了 system
(当前系统)、current
(每次读取的单个字节)、RETURN
(\r
十六进制编码)和 LINE
(\n
十六进制编码)等属性方便后面使用。
我们希望在监听的事件触发之前,就执行读取文件一行内容的逻辑,就说明我们需要一个在监听事件时就能执行的函数,那就需要在创建实例之前先监听 newListener
事件,把 newListener
的回调来作为这个函数执行,并能顺带在参数中获取事件类型。
我们把读取文件的核心逻辑放在了 newListener
事件的回调函数中,将这个回调函数命名为 readLineCallback
,为了保证执行时 readLineCallback
内部使用的 this
是 LineReader
的实例,使用 bind
进行修正。
行读取器核心逻辑 readLineCall 函数
如果需要默认就开始读取,并且每次读取一个字节后还可以进行下一次循环读取,这种场景最符合可读流的暂停模式 readable
事件默认触发一次,“容器” 内读走了一个字节,就会自动 “续杯” 的特点。
/* 行读取器的核心逻辑 */
function readLineCallback(type) {
// 使用暂停模式进行读取
this.on('readable', () => {
if (type === 'newLine') {
// 为了与 \r 和 \n 对比,每次只读一个字节
while ((this.current = this._rs.read(1))) {
// 结果为 Buffer,所以使用索引取出对比
switch (this.current[0]) {
case RETURN: // 针对 Windows
this.system = 'windows';
this.disposeLine(); // 处理换行逻辑
break;
case LINE: // 针对 Mac
this.system = 'mac';
this.disposeLine(); // 处理换行逻辑
break;
default:
// 每读到换行的字符存入数组中
this.buffers.push(current);
}
}
}
});
// 防止最后一行丢失
this.on('end', this.disposeLine.bind(this));
}
在上面代码中监听了 readable
事件并验证了事件类型是否为 newLine
,然后循环读取文件内容,为了与换行的十六进制码进行对比,每次只读取一个字节,当遇到换行符时,明确当前系统并调用换行符处理函数 disposeLine
进行处理。
注意:在最后一次的时候文件最后一行可能没有换行,所以不满足
switch
内语句的条件,即没使用disposeLine
进行处理,所以监听可读流的end
事件,并在end
触发时让disposeLine
作为回调函数执行,注意使用bind
修正this
为当前实例。
兼容操作系统的换行符处理函数
在换行符处理函数中,Windows
与其他系统(Mac
、Linux
)系统唯一的区别就是 Window 系统的换行符为 \r\n
,比 Mac 和 Linux 的 \n
多了一个字节,而在读取下一行时,这个字节是无用的,需要忽略。
/* 换行符处理函数 */
LineReader.prototype.disposeLine = function () {
// 将这一行的内容发射出来并清空数组
this.emit('newLine', Buffer.concat(this.buffers).toString());
this.buffers = [];
// 如果是 window 系统,下一个是 \n,就往下多读一个字节不存入组即可
if (this.system === 'windows') {
this._rs.read(1);
}
};
验证 LineReader 行读取器
创建一个 “行读取器” 需要创建 LineReader
类的实例,并传入被读取文件的路径,由于在源码中执行的是 newListener
的回调函数,所以只需添加 newLine
事件监听就可以了,然后会在 readable
默认触发时在内部循环读取,并把每行读到的内容重新整合后发送,实现 newLine
事件的连续触发,直到文件读完。
/* 使用行读取器 */
// 创建文件 1.txt 每次内容为 1~9 9个数字,每 3 个字符为一行
const lineReader = new LineReader('1.txt');
lineReader.on('newLine', data => {
console.log('------ ' + data + '------');
});
// ------ 123 ------
// ------ 456 ------
// ------ 789 ------
“行读取器” lineReader
对读取到每一行的数据进行处理的逻辑主要在 newLine
事件的回调函数中,比如上面例子,在每一行的前、后添加了 ------
并打印。
总结
在
Node.js
中,流的应用非常广泛,“行读取器” 只是其中的一种应用,可以根据流的不同模式的不同特性实现更复杂的功能,所以流在Node.js
中还是非常重要的。