RxJS 操作符-学习笔记

发布时间:2024年01月05日

提前准备:

  • pipe 方法: 用于组合多个操作符,可以将一系列操作符作为参数传递给 pipe 方法,这些操作符将 依次 对数据流进行处理。这里的依次很关键,也代表着pipe()中组合的这么几个操作符的执行顺序就是从开始一直到结束的,其中的数据会同流水线一般在各个操作符中进行传递。上一个操作符把数据处理好了,会自动地把这个处理好的数据送给下一个操作符接收,基于这个在上一步处理过的数据再进行进一步的加工,如此往复,直到执行到最后一个操作符为止。
  • ?RxJS通过提供一种?统一的方式?来处理异步和基于事件的程序。这种统一的方式就是基于它的核心思想:?将一切都视为随时间变化的数据流?,从而能够用更加函数式和响应式的方式来处理复杂的数据处理问题。实际实现的时候,其实就是把所有的数据流、事件全都转成 Observable ,然后再在这个 Observable 的基础上进行订阅消息,实现对其发布的数据的处理。
  • 观察者模式(Observer Pattern)是一种设计模式,其中一个对象(称为“主题”或“可观察对象”)维护一组观察者,当该对象状态变化时,会自动通知所有观察者。
  • Observables(可观察对象):在RxJS中,Observables就是这样的“主题”。它代表了一个可观察的数据流或异步事件流。
  • Observers(观察者):观察者是一个包含回调函数集合的对象,这些函数决定了如何响应Observable发送的数据或事件。
  • 订阅(Subscribe):观察者通过“订阅”Observable来接收数据和通知。当Observable发出数据时,会调用观察者的回调函数(例如:next, error, complete)。简写observable.subscribe(x => console.log('Observer got a next value: ' + x));
  • Observable 的懒加载:这表示?Observable 不会在创建时立即开始工作,而是在被订阅(subscribe)时才开始执行?。当你创建一个 Observable 实例时,你仅仅是?定义了一个数据流和如何生成这些数据的规则?。此时,数据流并不会立即开始。只有当 Observable 被某个观察者订阅时,定义在 Observable 构造函数中的函数(订阅函数)才会?被执行?。这个函数负责产生数据并通过?next,?error, 和?complete?方法发送给观察者。每当有新的订阅发生时,Observable 会从头开始执行其定义的逻辑。这意味着每个订阅者都有自己的数据流实例,互不影响。

一、组合

forkJoin

当有一组 observables,但你只关心每个 observable 最后发出的值时,此操作符是最适合的。此操作符的一个常见用例是在页面加载(或其他事件)时你希望发起多个请求,并在所有请求都响应后再采取行动。
当所有 observables 完成时,将每个 observable??的最新值作为数组发出。
如果内部 observable 不完成的话,forkJoin?永远不会发出值!

const example = forkJoin( 
  of('Hello'),// 立即发出 'Hello'
  of('World').pipe(delay(1000)),// 1秒后发出 'World'
);
const subscribe = example.subscribe(val => console.log(val));//输出: ["Hello", "World"]


//发送请求
 forkJoin(
      this._myService.myRequest('Request One', 2000),
      this._myService.myRequest('Request Two', 1000),
      this._myService.myRequest('Request Three', 3000)
    )
    .subscribe(([res1, res2, res3]) => {
      this.propOne = res1;
      this.propTwo = res2;
      this.propThree = res3;
    });

二、转换

1、mergeMap(fn):

功能:类似于 switchMap,但它不会取消之前的 Observables,而是合并所有的 Observables。
应用场景:当你需要处理每个值并同时保持所有结果时使用。

将内部 多个 observable,打平成一个observable,可以 使用mergeMap。

//将每个字母映射并打平成一个 Observable ,每1秒钟一次

var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
  Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

// 结果如下:
// a0
// b0
// c0
// a1
// b1
// c1
// 继续列出a、b、c加上各自的自增数列

另外需要注意:

  1. 如果一次只能激活一个内部订阅,请使用 switchMap。
  2. 如果内部 observables 的发射和订阅顺序很重要,请使用 concatMap.

当使用 switchMap 时,每个内部订阅在源发出时完成,即任意时间段只允许一个活动的内部订阅。相比之下,mergeMap 允许同时激活多个内部订阅。因此,mergeMap 最常见的用例之一是不应取消的请求,这些请求被认为是写入而不是读取。请注意,如果这些写入必须保持顺序,则 concatMap 是更好的选择。比如数据库的写操作。

由于 mergeMap 一次维护多个活动的内部订阅,因此可能会由于长期存在的内部订阅造成内存泄漏。一个基本的例子是,如果使用内部计时器或 dom 事件流映射到可观察对象。在这些情况下,如果仍然希望使用 mergeMap,一个好办法是利用另一个运算符来管理内部订阅的完成,比如考虑 take 或 takeUntil。当然还可以使用 concurrent 参数限制一次活动内部订阅的数量。

2、map(fn):

功能:将数据流中的每个值通过函数?fn?进行转换。
应用场景:当你需要修改数据流中的每个值时使用,例如将数据流中的数字乘以2。

3、switchMap(fn):

功能:对数据流中的每个值应用函数 fn,并将结果转换为新的 Observable,当新的值到来时,会取消之前的 Observable。
应用场景:处理级联数据流,如基于当前值获取新的数据。

三、过滤

1、filter(fn):

功能:根据函数?fn?的条件判断,决定是否保留数据流中的每个值。
应用场景:用于筛选数据流,例如过滤出偶数。

// 每1秒发出值
const source = interval(1000);

// 过滤掉所有值知道 interval 发出的值大于5
const example = source.pipe(filter(num => num > 5));

const subscribe = example.subscribe(val =>
  console.log(`${val}`) //输出 6 7 8 9
);

2、take(count):

功能:只取数据流的前 count 个值,然后完成。
应用场景:限制数据流的长度,如只取前5个值。

3、first():

功能:只取数据流的第一个值,然后完成。
应用场景:当你只对数据流的第一个值感兴趣时使用。


4、last():

功能:只取数据流的最后一个值,然后完成。
应用场景:当你只对数据流的最后一个值感兴趣时使用。

5、debounceTime(ms):

功能:在指定的毫秒数 ms 后,只发出最新的值,如果在这段时间内有新值产生,则重新计时。
应用场景:处理高频事件,如键盘输入。

//舍弃掉在两次输出之间小于指定时间的发出值


const input = document.getElementById('example');

// 对于每次键盘敲击,都将映射成当前输入值
const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));

// 在两次键盘敲击之间等待0.5秒方才发出当前值,
// 并丢弃这0.5秒内的所有其他值
const debouncedInput = example.pipe(debounceTime(500));

// 输出值
const subscribe = debouncedInput.subscribe(val => {
  console.log(`Debounced Input: ${val}`);
});


6、throttleTime(ms):

功能:在每个时间窗口 ms 的开始,发出最新的值。
应用场景:限制数据流的速率,例如在滚动事件中。

四、错误处理

1、catchError(fn):

功能:捕获错误,并通过函数 fn 提供一种处理错误的方式。
应用场景:错误处理。

五、创建

1、of(...values):

用于创建一个 Observable,它会依次发出提供的参数,然后完成。
例如:of(1, 2, 3) 会依次发出 1, 2, 3。


2、from(iterable):

将数组、类数组对象、Promise 或迭代器转换为 Observable。
例如:from([1, 2, 3]) 会依次发出 1, 2, 3。


3、interval(period):

创建一个 Observable,它按照指定的时间间隔连续发出数字序列。
例如:interval(1000) 每隔1秒发出一个递增的数字。


4、timer(initialDelay, period):

在给定的初始延迟之后,发出数字0,然后如果指定了周期,将继续以该周期发出递增的数字。
例如:timer(3000, 1000) 在3秒后发出0,然后每隔1秒发出一个递增的数字。


5、fromEvent(target, eventName):

从DOM事件、Node.js EventEmitter 事件或其他事件源创建 Observable。
例如:fromEvent(document, 'click') 用于从文档的点击事件创建 Observable。


6、ajax(urlOrRequest):

用于创建一个 Observable,以发出针对URL的Ajax请求的响应。
例如:ajax('/api/data') 会发出对 /api/data 的Ajax请求的响应。


7、create(subscribe):

传统的方式来创建一个新的 Observable,通过提供一个 subscribe 函数。
例如:new Observable(subscriber => {...})。

文章来源:https://blog.csdn.net/qianqianyixiao1/article/details/135411983
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。