RxJS

2024-07-01 12:01:02 531
RxJS(Reactive Extensions for JavaScript)是一个用于编写异步和基于事件程序的库,通过使用可观察序列来简化异步编程,适用于浏览器和 Node.js 环境。它提供了一套丰富的操作符,可以组合、过滤和变换异步数据流,广泛用于处理用户输入、网络请求、动画等。

特点

  • 异步编程:使用 Observable(可观察对象)来处理异步数据流,简化了异步编程模型。
  • 丰富的操作符:提供了超过 100 种操作符,用于创建、转换和组合 Observable。
  • 可组合性:操作符可以轻松组合,创建复杂的异步数据流。
  • 跨平台支持:可在浏览器、Node.js 以及其他 JavaScript 环境中使用。
  • 强类型支持:与 TypeScript 无缝集成,提供强类型支持,提升代码质量和开发效率。

使用场景

  • 用户交互:处理用户输入事件,如点击、拖拽、键盘输入等。
  • 数据流处理:处理实时数据流,如 WebSocket 数据、流媒体等。
  • 网络请求:管理复杂的异步 HTTP 请求,处理依赖和并行请求。
  • 动画和时间管理:创建复杂的动画序列和时间调度。
  • 状态管理:在单页应用中管理应用状态。

安装方式

使用 npm 安装:

npm install rxjs

使用 yarn 安装:

yarn add rxjs

通过 CDN 引入:

<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

使用示例

基本使用

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(map(x => x * 2))
  .subscribe(value => console.log(value)); // 输出: 2, 4, 6

处理用户输入

import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
clicks.pipe(map(event => `X: ${event.clientX}, Y: ${event.clientY}`))
      .subscribe(pos => console.log(pos));

网络请求

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';

const apiData = ajax('/api/data').pipe(
  map(response => response.response),
  catchError(error => {
    console.log('error: ', error);
    return of(error);
  })
);

apiData.subscribe(data => console.log(data));

常用 API 介绍

  • Observable:核心类,用于创建可观察对象。

    import { Observable } from 'rxjs';
    
    const observable = new Observable(subscriber => {
      subscriber.next('Hello');
      subscriber.complete();
    });
    
    observable.subscribe({
      next(x) { console.log(x); },
      complete() { console.log('Done'); }
    });
    
  • 操作符:用于操作 Observable 的函数。

    • map:对 Observable 发出的每个值应用函数并返回新值。
    • filter:仅传递满足条件的值。
    • mergeMap:将 Observable 映射为多个内部 Observable,并将它们合并为一个。
    • switchMap:将 Observable 映射为内部 Observable,并只发出最新的值。
  • Subject:同时是 Observable 和 Observer,可以多播值给多个订阅者。

    import { Subject } from 'rxjs';
    
    const subject = new Subject();
    
    subject.subscribe({
      next: (v) => console.log(`observerA: ${v}`)
    });
    
    subject.subscribe({
      next: (v) => console.log(`observerB: ${v}`)
    });
    
    subject.next(1);
    subject.next(2);
    

高级用法

自定义操作符

可以通过组合现有操作符创建自定义操作符:

import { pipe } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const customOperator = pipe(
  filter(x => x % 2 === 0),
  map(x => x * 2)
);

of(1, 2, 3, 4).pipe(customOperator).subscribe(console.log); // 输出: 4, 8

流式处理复杂异步任务

通过组合多个操作符处理复杂异步任务:

import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { switchMap, catchError } from 'rxjs/operators';

const button = document.getElementById('myButton');

fromEvent(button, 'click').pipe(
  switchMap(() => ajax.getJSON('/api/data')),
  catchError(error => of({ error: true, message: error.message }))
).subscribe(data => {
  if (data.error) {
    console.error(data.message);
  } else {
    console.log(data);
  }
});

官方资料

RxJS 是一个强大的工具,通过提供丰富的操作符和灵活的 API,帮助开发者简化异步编程,轻松处理各种复杂的异步数据流需求。