80 行代码实现简易 RxJS

更新日期: 2022-07-20阅读: 841标签: rxjs

RxJS 是一个响应式的库,它接收从事件源发出的一个个事件,经过处理管道的层层处理之后,传入最终的接收者,这个处理管道是由操作符组成的,开发者只需要选择和组合操作符就能完成各种异步逻辑,极大简化了异步编程。除此以外,RxJS 的设计还遵循了函数式、流的理念。

直接讲概念比较难理解,不如我们实现一个简易的 RxJS 再来看这些。

RxJS 的使用

RxJS 会对事件源做一层封装,叫做 Observable,由它发出一个个事件。

比如这样:

const source = new Observable((observer) => {  
    let i = 0;  
    setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
});  

在回调函数里面设置一个定时器,不断通过 next 传入事件。

这些事件会被接受者监听,叫做 Observer。

const subscription = source.subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  

observer 可以接收 next 传过来的事件,传输过程中可能有 error,也可以在这里处理 error,还可以处理传输完成的事件。

这样的一个监听或者说订阅,叫做 Subscription。

可以订阅当然也可以取消订阅:

subscription.unsubscribe();  

取消订阅时的回调函数是在 Observable 里返回的:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  

发送事件、监听事件只是基础,处理事件的过程才是 RxJS 的精髓,它设计了管道的概念,可以用操作符 operator 来组装这个管道:

source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe(() => {  
    //...  
})  

事件经过管道之后才会传到 Observer,在传输过程中会经过一个个操作符的处理。

比如这里的处理逻辑是,对传过来的数据加 1,然后再乘以 10。

综上,使用 RxJS 的代码就是这样的:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  
  
setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  

我们通过 Observable 创建了一个事件源,每秒发出一个事件,这些事件会经过管道的处理再传递给 Observer,管道的组成是两个 map 操作符,对数据做了 + 1 和 * 10 的处理。

Observer 接收到传递过来的数据,做了打印,还对错误和结束时的事件做了处理。此外,Observable 提供了取消订阅时的处理逻辑,当我们在 4.5s 取消订阅时,就可以清除定时器。

使用 RxJS 基本就是这个流程,那它是怎么实现的呢?

80 行代码实现 RxJS

先从事件源开始,实现 Observable:

观察下它的特点:

  1. 它接收一个回调函数,里面可以调用 next 来传输数据。
  2. 它有 subscribe 方法可以用来添加 Observer 的订阅,返回 subscription
  3. 它可以在回调函数里返回 unsbscribe 时的处理逻辑
  4. 它有 pipe 方法可以传入操作符

我们按照这些特点来实现下:

首先,Observable 的构造函数要接收回调函数 _subscribe,但是不是立刻调用,而是在 subscribe 的时候才调用:

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe() {  
        this._subscribe();  
    }  
}  

回调函数的参数是有 next、error、complete 方法的对象,用于传递事件:

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        this._subscribe(subscriber);  
    }  
}  
  
class Subscriber{  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  

这样,在回调函数里面就可以调用 next、error、complete 方法了:


此外,回调函数的返回值是 unsbscribe 时的处理逻辑,要收集起来,在取消订阅时调用:

class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  

提供 unsubscribe 方法用于取消订阅,_teardowns 用于收集所有的取消订阅时的回调,在 unsubscribe 时调用所有 teardown 回调。

这段逻辑比较通用,可以作为 Subscriber 的父类。

然后,在 Observable 里调用 add 来添加 teardown,并且返回 subscription(它有 unsubscribe 方法):

class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        subscriber.add(this._subscribe(subscriber));  
        return subscriber;  
    }  
}  
class Subscriber extends Subscription {  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  
  
class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  

这样,我们就实现了 Observable 和 Observer,只写了 50 行代码。先来测试下:

const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  
  
setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  

Observer 监听到了 Observable 传递过来的 1、2、3、4 的数据,因为在 4.5s 时取消了订阅,所以后面就不再有数据了。

我们用 50 行实现了基础的 RxJS!

当然,最精髓的 operator 还没有实现,接下来继续完善。

我们给 Observable 添加 pipe 方法,它会调用传入的 operator,并且上个的结果是下个的输入,这样就串起来了,也就是管道的概念:

class Observable {  
    constructor(_subscribe) {  
        //...  
    }  
    subscribe(observer) {  
       //...  
    }  
    pipe(...operations) {  
        return pipeFromArray(operations)(this);  
    }  
}  
  
function pipeFromArray(fns) {  
    if (fns.length === 0) {  
        return (x) => x;  
    }  
    if (fns.length === 1) {  
        return fns[0];  
    }  
    return (input) => {  
        return fns.reduce((prev, fn) => fn(prev), input);  
    };  
}  

当传入的参数是 0 个的时候,就直接返回之前的 Observable,1 个的时候直接返回,否则就通过 reduce 的方式串联起来,组成管道。

operator 的实现就是监听上一个 Observable,返回一个新的。

比如 map 的实现,就是传入 project 对 value 做处理,把结果用 next 传下去:

function map(project) {  
    return (observable) => new Observable((subscriber) => {  
        const subcription = observable.subscribe({  
            next(value) {  
                return subscriber.next(project(value));  
            },  
            error(err) {  
                subscriber.error(err);  
            },  
            complete() {  
                subscriber.complete();  
            },  
        });  
        return subcription;  
    });  
}  

这样我们就实现了 operator,来测试下:


我们调用了 pipe 方法,使用两个 map 操作符来组织处理流程,对数据做了 +1 和 *10 的处理。

所以,Observable 传递过来的 1、2、3、4 传递给 Observer 的时候就变成了 20、30、40、50。

至此,我们实现了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一个简易版  RxJS 了。只用了 80 行代码。

再来看最开始的那些理念:

为什么叫做响应式呢?

因为是对事件源做监听和一系列处理的,这种编程模式就叫做响应式。

为什么叫函数式呢?

因为每一步 operator 都是纯函数,返回一个新的 Observable,这符合函数式的不可变,修改后返回一个新的的理念。

为什么叫流呢?

因为一个个事件是动态产生和传递的,这种数据的动态产生和传递就可以叫做流。

完整代码如下:

function pipeFromArray(fns) {  
    if (fns.length === 0) {  
        return (x) => x;  
    }  
    if (fns.length === 1) {  
        return fns[0];  
    }  
    return (input) => {  
        return fns.reduce((prev, fn) => fn(prev), input);  
    };  
}  
class Subscription {  
    constructor() {  
        this._teardowns = [];  
    }  
    unsubscribe() {  
        this._teardowns.forEach((teardown) => {  
            typeof teardown === 'function' ? teardown() : teardown.unsubscribe()  
        });  
    }  
    add(teardown) {  
        if (teardown) {  
            this._teardowns.push(teardown);  
        }  
    }  
}  
class Subscriber extends Subscription {  
    constructor(observer) {  
        super();  
        this.observer = observer;  
        this.isStopped = false;  
    }  
    next(value) {  
        if (this.observer.next && !this.isStopped) {  
            this.observer.next(value);  
        }  
    }  
    error(value) {  
        this.isStopped = true;  
        if (this.observer.error) {  
            this.observer.error(value);  
        }  
    }  
    complete() {  
        this.isStopped = true;  
        if (this.observer.complete) {  
            this.observer.complete();  
        }  
        if (this.unsubscribe) {  
            this.unsubscribe();  
        }  
    }  
}  
class Observable {  
    constructor(_subscribe) {  
        this._subscribe = _subscribe;  
    }  
    subscribe(observer) {  
        const subscriber = new Subscriber(observer);  
        subscriber.add(this._subscribe(subscriber));  
        return subscriber;  
    }  
    pipe(...operations) {  
        return pipeFromArray(operations)(this);  
    }  
}  
function map(project) {  
    return (observable) => new Observable((subscriber) => {  
        const subcription = observable.subscribe({  
            next(value) {  
                return subscriber.next(project(value));  
            },  
            error(err) {  
                subscriber.error(err);  
            },  
            complete() {  
                subscriber.complete();  
            },  
        });  
        return subcription;  
    });  
}  
  
  
const source = new Observable((observer) => {  
    let i = 0;  
    const timer = setInterval(() => {  
        observer.next(++i);  
    }, 1000);  
    return function unsubscribe() {  
        clearInterval(timer);  
    };  
});  
const subscription = source.pipe(  
    map((i) => ++i),  
    map((i) => i * 10)  
).subscribe({  
    next: (v) => console.log(v),  
    error: (err) => console.error(err),  
    complete: () => console.log('complete'),  
});  
  
setTimeout(() => {  
    subscription.unsubscribe();  
}, 4500);  

总结

为了理解 RxJS 的响应式、函数式、流等理念,我们实现了简易版的 RxJS。

我们实现了 Observable、Observer、Subscription 等概念,完成了事件的产生和订阅以及取消订阅。

接着又实现了 operator 和 pipe,每个 operator 返回一个新的 Observable,对数据做层层处理。

写完以后,我们能更清晰的理解响应式、函数式、流等理念在 RxJS 里是怎么体现的。

实现简易版 RxJS,只需要 80 行代码。

以上文章来源于神光的编程秘籍 ,作者神说要有光

链接: https://www.fly63.com/article/detial/11916

rxjs入门指南

在复杂的,频繁的异步请求场景,使用rxjs。在依赖的多个异步数据,决定渲染的情景,使用rxjs。总之:在前台频繁的、大量的、和后台数据交互的复杂项目里面,使用rxjs(web端,iOS,android端等,客户端都可考虑使用)

Rxjs常用operators

本文使用的是angular6内置的rxjs,concat:通过顺序地发出多个 Observables 的值将它们连接起来,一个接一个的。count计算源的发送数量,并当源完成时发出该数值。

RxJS主题(Subject)

什么是主题?RxJS 主题就是一个特性类型的 Observable 对象,它允许值多路广播给观察者(Observers)。当一个简单的 Observable 是单播的(每个订阅的观察者它们自己都依赖 Observable 的执行)时候,主题(Subjects)就是多播的。

RxJS中Operators

RxJS 的操作符(operators)是最有用的,尽管 Observable 是最基本的。操作符最基本的部分(pieces)就是以申明的方式允许复杂的异步代码组合简化。操作符是函数。这里有两种操作符:

RxJS调度器(Scheduler)

调度是数据结构。它知道怎样在优先级或其他标准去存储和排队运行的任务,调度器是一个执行上下文。它表示任务在何时何地执行(例如,立即或是在回调机制中如 setTimeout 或 process.nextTick,又或是动画框架)

我每天都在使用的 10 个 RxJS 运算符

作为一名 Angular 开发人员,您可能会发现以下 RxJS 运算符在您的日常开发中很有用:map():此运算符用于转换可观察对象发出的值。它以一个函数作为参数,它接收发出的值作为输入并返回转换后的输出

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!