极乐门资源网 Design By www.ioogu.com

Observer Pattern

观察者模式定义

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

  • 期刊出版方 - 负责期刊的出版和发行工作
  • 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。

关于RxJS Subject的学习笔记

观察者模式结构

关于RxJS Subject的学习笔记

观察者模式实战

Subject 类定义

class Subject {
  
  constructor() {
    this.observerCollection = [];
  }
  
  addObserver(observer) { // 添加观察者
    this.observerCollection.push(observer);
  }
  
  deleteObserver(observer) { // 移除观察者
    let index = this.observerCollection.indexOf(observer);
    if(index >= 0) this.observerCollection.splice(index, 1);
  }
  
  notifyObservers() { // 通知观察者
    this.observerCollection.forEach((observer)=>observer.notify());
  }
}

Observer 类定义

class Observer {
  constructor(name) {
    this.name = name;
  }
  
  notify() {
    console.log(`${this.name} has been notified.`);
  }
}

使用示例

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'

subject.addObserver(observer1); // 注册观察者A
subject.addObserver(observer2); // 注册观察者B
 
subject.notifyObservers(); // 通知观察者

subject.deleteObserver(observer1); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified.
lolo has been notified.
lolo has been notified.

Observable subscribe

在介绍RxJS - Subject 之前,我们先来看个示例:

const interval$ = Rx.Observable.interval(1000).take(3);

interval$.subscribe({
 next: value => console.log('Observer A get value: ' + value);
});

setTimeout(() => {
 interval$.subscribe({
   next: value => console.log('Observer B get value: ' + value);
 });
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  • Observable 对象可以被重复订阅
  • Observable 对象每次被订阅后,都会重新执行

上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

function interval() {
 setInterval(() => console.log('..'), 1000);
}

interval();

setTimeout(() => {
 interval();
}, 1000);

Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。

自定义 Subject

Subject 类定义

class Subject {  
  constructor() {
    this.observers = [];
  }
  
  addObserver(observer) { 
    this.observers.push(observer);
  }
  
  next(value) { 
    this.observers.forEach(o => o.next(value));  
  }
  
  error(error){ 
    this.observers.forEach(o => o.error(error));
  }
  
  complete() {
    this.observers.forEach(o => o.complete());
  }
}

使用示例

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Subject();

let observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
};

var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
};

subject.addObserver(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
  subject.addObserver(observerB); // 添加观察者B
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。

RxJS Subject

首先我们通过 RxJS Subject 来重写一下上面的示例:

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Rx.Subject();

let observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
};

var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
  subject.subscribe(observerB); // 添加观察者B
}, 1000);

RxJS Subject 源码片段

/**
 * Suject继承于Observable 
 */
export class Subject extends Observable {
  constructor() {
    super();
    this.observers = []; // 观察者列表
    this.closed = false;
    this.isStopped = false;
    this.hasError = false;
    this.thrownError = null;
  }
 
  next(value) {
    if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
    if (!this.isStopped) {
      const { observers } = this;
      const len = observers.length;
      const copy = observers.slice();
      for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
        copy[i].next(value);
      }
    }
  }
 
  error(err) {
    if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
    this.hasError = true;
    this.thrownError = err;
    this.isStopped = true;
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) { // 循环调用观察者error方法
      copy[i].error(err);
    }
    this.observers.length = 0;
  }
 
  complete() {
    if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
    this.isStopped = true;
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
      copy[i].complete();
    }
    this.observers.length = 0; // 清空内部观察者列表
  }
}

通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:

  • Subject 既是 Observable 对象,又是 Observer 对象
  • 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)

Angular 2 RxJS Subject 应用

在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:

message.service.ts

import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';

@Injectable()
export class MessageService {
  private subject = new Subject<any>();

  sendMessage(message: string) {
    this.subject.next({ text: message });
  }

  clearMessage() {
    this.subject.next();
  }

  getMessage(): Observable<any> {
    return this.subject.asObservable();
  }
}

home.component.ts

import { Component } from '@angular/core';

import { MessageService } from '../_services/index';

@Component({
  moduleId: module.id,
  templateUrl: 'home.component.html'
})

export class HomeComponent {
  constructor(private messageService: MessageService) {}
  
  sendMessage(): void { // 发送消息
    this.messageService.sendMessage('Message from Home Component to App Component!');
  }

  clearMessage(): void { // 清除消息
    this.messageService.clearMessage();
  }
}

app.component.ts

import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';

import { MessageService } from './_services/index';

@Component({
  moduleId: module.id,
  selector: 'app',
  templateUrl: 'app.component.html'
})

export class AppComponent implements OnDestroy {
  message: any;
  subscription: Subscription;

  constructor(private messageService: MessageService) {
    this.subscription = this.messageService.getMessage()
       .subscribe(message => { this.message = message; });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:

关于RxJS Subject的学习笔记

Subject 存在的问题

因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:

next(value) {
    if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
    if (!this.isStopped) {
      const { observers } = this;
      const len = observers.length;
      const copy = observers.slice();
      for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
        copy[i].next(value);
      }
    }
}

这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
subject.subscribe(x => console.log('A', x));
example.subscribe(x => console.log('B', x));
subject.subscribe(x => console.log('C', x));

source.subscribe(subject);

以上代码运行后,控制台的输出结果:

A 0
B 0
C 0
A 1
Rx.min.js:74 Uncaught Error: oops

JSBin - Subject Problem Demo

在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});

subject.subscribe(
  x => console.log('A', x),
  error => console.log('A Error:' + error)
);
  
example.subscribe(
  x => console.log('B', x),
  error => console.log('B Error:' + error)
);

subject.subscribe(
  x => console.log('C', x),
  error => console.log('C Error:' + error)
);

source.subscribe(subject);

JSBin - RxJS Subject Problem Solved Demo

RxJS Subject & Observable

Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next - 每当 Subject 对象接收到新值的时候,next 方法会被调用
  • error - 运行中出现异常,error 方法会被调用
  • complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用
  • subscribe - 添加观察者
  • unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)

BehaviorSubject

BehaviorSubject 定义

BehaviorSubject 源码片段

export class BehaviorSubject extends Subject {
  constructor(_value) { // 设置初始值
    super();
    this._value = _value;
  }
  get value() { // 获取当前值
    return this.getValue();
  }
  _subscribe(subscriber) {
    const subscription = super._subscribe(subscriber);
    if (subscription && !subscription.closed) {
      subscriber.next(this._value); // 为新的订阅者发送当前最新的值
    }
    return subscription;
  }
  getValue() {
    if (this.hasError) {
      throw this.thrownError;
    }
    else if (this.closed) {
      throw new ObjectUnsubscribedError();
    }
    else {
      return this._value;
    }
  }
  next(value) { // 调用父类Subject的next方法,同时更新当前值
    super.next(this._value = value);
  }
}

BehaviorSubject 应用

有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:

var subject = new Rx.Subject();

var observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
};

var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
 subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3

通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:

var subject = new Rx.BehaviorSubject(0); // 设定初始值

var observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
};

var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
 subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

JSBin - BehaviorSubject

ReplaySubject

ReplaySubject 定义

ReplaySubject 源码片段

export class ReplaySubject extends Subject {
  constructor(bufferSize = Number.POSITIVE_INFINITY, 
        windowTime = Number.POSITIVE_INFINITY, 
        scheduler) {
    super();
    this.scheduler = scheduler;
    this._events = []; // ReplayEvent对象列表
    this._bufferSize = bufferSize < 1 "htmlcode">
var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值

var observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
};

var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
 subject.subscribe(observerB); // 1秒后订阅
}, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3

可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

JSBin - ReplaySubject

AsyncSubject

AsyncSubject 定义

AsyncSubject 源码片段

export class AsyncSubject extends Subject {
  constructor() {
    super(...arguments);
    this.value = null;
    this.hasNext = false;
    this.hasCompleted = false; // 标识是否已完成
  }
  _subscribe(subscriber) {
    if (this.hasError) {
      subscriber.error(this.thrownError);
      return Subscription.EMPTY;
    }
    else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
      subscriber.next(this.value);
      subscriber.complete();
      return Subscription.EMPTY;
    }
    return super._subscribe(subscriber);
  }
  next(value) {
    if (!this.hasCompleted) { // 若未完成,保存当前的值
      this.value = value;
      this.hasNext = true;
    }
  }
}

AsyncSubject 应用

AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

var subject = new Rx.AsyncSubject();

 var observerA = {
  next: value => console.log('Observer A get value: ' + value),
  error: error => console.log('Observer A error: ' + error),
  complete: () => console.log('Observer A complete!')
 };

 var observerB = {
  next: value => console.log('Observer B get value: ' + value),
  error: error => console.log('Observer B error: ' + error),
  complete: () => console.log('Observer B complete!')
 };

 subject.subscribe(observerA);

 subject.next(1);
 subject.next(2);
 subject.next(3);

 subject.complete();

 setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
 }, 1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 3
Observer A complete!
Observer B get value: 3
Observer B complete!

JSBin - AsyncSubject

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

标签:
RxJS,Subject

极乐门资源网 Design By www.ioogu.com
极乐门资源网 免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
极乐门资源网 Design By www.ioogu.com

评论“关于RxJS Subject的学习笔记”

暂无关于RxJS Subject的学习笔记的评论...

稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!

昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。

这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。

而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?