前端面试每日3+2(第25天)

12/26/2019 每日3+2

当你发现自己的才华撑不起你的野心时,就请安静下来学习吧!

鲁迅说过:答案仅供参考...

# 1、node中,流(stream)是什么,基本类型?流(stream)有什么好处、应用?

解析:

什么是流?

  • stream是基于事件EventEmitter的数据管理模式,是node中处理流式数据的抽象接口。stream模块用于构建实现了流接口的对象。
  • node中访问stream模块 :const stream = require('stream');

基本类型

  • Writable - 可写入数据的流(例如 fs.createWriteStream())
  • Readable - 可读取数据的流(例如 fs.createReadStream())
  • Duplex - 可读又可写的流(例如 net.Socket)
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())

好处

  • 非阻塞式数据处理提升效率
  • 片段处理节省内存
  • 管道处理方便拓展等

应用

文件、网络、数据转换、音频视频等

# 2、可写流是什么?举例子。简单实现一个可写流。

解析:

可写流是对数据要被写入的目的地的一种抽象。
可写流例子:

  • 客户端的HTTP请求
  • 服务端的HTTP相应
  • fs的写入流:fs.createWriteStream
  • zlib流
  • crypto 流
  • TCP socket
  • 子进程 stdin
  • process.stdout 、process.stderr

简单实现一个fs.createWriteStream:

// 只要是用户传递的属性 都把他放在实例上
const fs = require("fs");
const EventEmitter = require("events");
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding = options.encoding || null;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end || undefined;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    // 可读流默认叫非流动模式
    this.flowing = null; // 非流动模式
    this.close = false; 
    // 需要打开文件
    this.open(); // 默认开启文件 异步


    
    this.on("newListener", type => { // 同步的
      if (type === "data") {// 触发data就开始设置流动并且读取文件
        this.flowing = true; // 已经开始读取文件了
        this.read(); // 开始读取数据
        // this.fd 获取不到!
      }
    });
    this.pos = this.start; // 每次读取的位置
  }
  pause() {// 暂停
    this.flowing = false;
  }
  resume() {
    if (!this.flowing) {
      // 如果是非流动模式 才需要编程流动模式
      this.flowing = true;
      this.read(); // 再次读取
    }
  }
  read() {
    // 读取文件,需要等待触发open事件后在执行
    // 发布订阅
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read()); // 发布订阅!没有的话,先绑定open事件,等待open后 ,emit open事件
    }
    // 读取的时候要看有没有超过end
    // 算法就是用所有的个数 - 当前的偏移量如果小于了水位线 说明这次读取的个数不是highWaterMark个
    let howMutchToRead = this.end
      ? Math.min(this.end - this.start + 1 - this.pos, this.highWaterMark)
      : this.highWaterMark;
    let buffer = Buffer.alloc(howMutchToRead);
    fs.read(this.fd, buffer, 0, howMutchToRead, this.pos, (err, bytesRead) => {
      // 只有读取到数据后才发射结果
      if (bytesRead > 0) {
        if (this.flowing) {
          // 正在读文件
          this.pos += bytesRead;
          this.emit(
            "data",
            this.encoding ? buffer.slice(0,bytesRead).toString(this.encoding) : buffer.slice(0,bytesRead)
          ); // 截取有效个数
          this.read();
        }
      } else {
        this.emit("end");
        if (this.autoClose) {// 自动关闭
          fs.close(this.fd, () => {
            if(!this.close){
              this.emit("close");
              this.close = true;
            }
          });
        }
      }
    });
  }
  open() {
    // 打开文件
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.emit("error", err);
      }
      this.fd = fd;
      this.emit("open", fd); // 发布订阅,打开文件拿到fd时触发open
    });
  }
  pipe(ws){
    this.on('data',(data)=>{
      let flag = ws.write(data);
      if(!flag){
          this.pause();
      }
  });
  ws.on('drain',()=>{
      this.resume();
  })
  }
}
module.exports = ReadStream;
// on off once emit newListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

# 3、可读流是什么?举例子。简单实现一个可读流。

解析:

可读流是对提供数据的来源的一种抽象。
可读流的例子:

  • 客户端的 HTTP 响应
  • 服务器的 HTTP 请求
  • fs 的读取流:fs.createReadStream()
  • zlib 流
  • crypto 流
  • TCP socket
  • 子进程 stdout 与 stderr
  • process.stdin

简单实现一个fs.createReadStream():

let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
    constructor(path,options = {}){
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose =  options.autoClose || true;

        this.cache = []; // 缓存区
        this.writing = false; // 默认不是正在写入
        this.needDrain = false;// 只有写入的内容大于等于highWaterMark needDrain = true
        this.pos = this.start; // 写入的偏移量
        this.len = 0; // 写入的总个数 每次调用wirte 我就讲内容的个数累加到len属性上,每次写入后在将len的值减少即可

        this.open();
    }
    /**
     * ws.write
     * @param {string | buffer} chunk 字符串或者buffer
     * @param {utf-8} encoding 编码格式
     * @param {function} callback 回调函数
     * @returns {boolen} ret 返回的是是否达到预期 
     */
    write(chunk,encoding=this.encoding,callback){
        chunk = Buffer.isBuffer(chunk)?chunk : Buffer.from(chunk); // 判断 chunk就是buffer类型
        this.len += chunk.length;
        // 判断写入的内容是否超过水位线
        let ret = this.highWaterMark > this.len;
        this.needDrain = !ret;
        if(this.writing){
            // 正在写入的话,把将要写入的先放到缓存
            this.cache.push({
                chunk,
                encoding,
                callback
            });
        }else{
            this.writing = true; // 正在写入
            this._write(chunk,encoding,callback);// 真正的写入逻辑
        }
        return ret;
    }
    clearBuffer(){
        let obj = this.cache.shift(); // 从后面删除
        if(!obj){
            if(this.needDrain){
                this.writing = false;
                this.needDrain = false;
                this.emit('drain');
            }
        }else{
            this._write(obj.chunk,obj.encoding,obj.callback);
        }
    }
    // 核心
    _write(chunk,encoding,callback){
        // console.log(this.fd)
         if(typeof this.fd !== 'number'){
             return this.once('open',()=>this._write(chunk,encoding,callback))
         }
         fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,written)=>{
            this.pos += written
            this.len -= written;// 较少缓存区的数量
            callback && callback(); // 调用写入成功的回调
            // 清空缓存区
            this.clearBuffer();// 清空缓存区
         });
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                return this.emit('error');
            }
            this.fd = fd;
            this.emit('open',fd)
        });
    }
}

module.exports = WriteStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

# 4、双工流是什么?举例子

解析:

双工流(Duplex)是同时实现了 Readable 和 Writable 接口的流。
双工流的例子:

  • TCP socket
  • zlib 流
  • crypto 流

# 5、转换流是什么?举例子

解析:

转换流(Transform)是一种 Duplex 流,但它的输出与输入是相关联的。 与 Duplex 流一样, Transform 流也同时实现了 Readable 和 Writable 接口。 Transform 流的例子包括:

  • zlib 流
  • crypto 流

# 参考资料

node流 (opens new window)

Last Updated: 1/14/2020, 7:56:38 AM
    asphyxia
    逆时针向