使用 npm 安装:
npm install rxjs
使用 yarn 安装:
yarn add rxjs
通过 CDN 引入:
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3)
.pipe(map(x => x * 2))
.subscribe(value => console.log(value)); // 输出: 2, 4, 6
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
clicks.pipe(map(event => `X: ${event.clientX}, Y: ${event.clientY}`))
.subscribe(pos => console.log(pos));
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const apiData = ajax('/api/data').pipe(
map(response => response.response),
catchError(error => {
console.log('error: ', error);
return of(error);
})
);
apiData.subscribe(data => console.log(data));
Observable:核心类,用于创建可观察对象。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next('Hello');
subscriber.complete();
});
observable.subscribe({
next(x) { console.log(x); },
complete() { console.log('Done'); }
});
操作符:用于操作 Observable 的函数。
map
:对 Observable 发出的每个值应用函数并返回新值。filter
:仅传递满足条件的值。mergeMap
:将 Observable 映射为多个内部 Observable,并将它们合并为一个。switchMap
:将 Observable 映射为内部 Observable,并只发出最新的值。Subject:同时是 Observable 和 Observer,可以多播值给多个订阅者。
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
可以通过组合现有操作符创建自定义操作符:
import { pipe } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const customOperator = pipe(
filter(x => x % 2 === 0),
map(x => x * 2)
);
of(1, 2, 3, 4).pipe(customOperator).subscribe(console.log); // 输出: 4, 8
通过组合多个操作符处理复杂异步任务:
import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { switchMap, catchError } from 'rxjs/operators';
const button = document.getElementById('myButton');
fromEvent(button, 'click').pipe(
switchMap(() => ajax.getJSON('/api/data')),
catchError(error => of({ error: true, message: error.message }))
).subscribe(data => {
if (data.error) {
console.error(data.message);
} else {
console.log(data);
}
});
RxJS 是一个强大的工具,通过提供丰富的操作符和灵活的 API,帮助开发者简化异步编程,轻松处理各种复杂的异步数据流需求。