RxJS 入门指引和初步应用
https://zhuanlan.zhihu.com/p/25383159
RxJS提供了各种API来创建数据流:
单值:of, empty, never
多值:from
定时:interval, timer
从事件创建:fromEvent
从Promise创建:fromPromise
自定义创建:create
创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:
改变数据形态:map, mapTo, pluck
过滤一些值:filter, skip, first, last, take
时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
累加:reduce, scan
异常处理:throw, catch, retry, finally
条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
转接:switch
也可以对若干个数据流进行组合:
concat,保持原来的序列顺序连接两个数据流
merge,合并序列
race,预设条件为其中一个数据流完成
forkJoin,预设条件为所有数据流都完成
zip,取各来源数据流最后一个值合并为对象
combineLatest,取各来源数据流最后一个值合并为数组
在RxJS中,存在这么几种东西:
Observable 可观察序列,只出不进
Observer 观察者,只进不出
Subject 可出可进的可观察序列,可作为观察者
ReplaySubject 带回放
Subscription 订阅关系
前三种东西,根据它们数据进出的可能性,可以通俗地理解他们的连接方式,这也就是所谓管道的“形状”,一端密闭一端开头,还是两端开口,都可以用来辅助记忆。
上面提到的Subscription,则是订阅之后形成的一个订阅关系,可以用于取消订阅。
Observable.interval(1000).subscribe(() => { this.diff = moment(createAt).fromNow() }) |
const timeA$ = Observable.interval(1000) const timeB$ = timeA$.filter(num => { return (num % 2 != 0) && (num % 3 != 0) && (num % 5 != 0) && (num % 7 != 0) }) const timeC$ = timeB$.debounceTime(3000) const timeD$ = timeC$.delay(2000) A: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 B: 1 11 13 17 19 C: 1 13 19 D: 1 13 |
const 九阴真经 = '天之道,损有余而补不足' const 黄蓉$ = new ReplaySubject(Number.MAX_VALUE) const 郭靖$ = new ReplaySubject(3) const 读书$ = Observable.from(九阴真经.split('')) 读书$.subscribe(黄蓉$) 读书$.subscribe(郭靖$) |
const action$ = new Subject() const reducer = (state, payload) => { // 把payload叠加到state上返回 } const state$ = action$.scan(reducer) .startWith({}) |
const meAction$ = new Subject() const meReducer = (state, payload) => {} const articleAction$ = new Subject() const articleReducer = (state, payload) => {} const me$ = meAction$.scan(meReducer).startWith({}) const article$ = articleAction$.scan(articleReducer).startWith({}) const state$ = Observable .zip( me$, article$, (me, article) => {me, article} ) const editable$ = Observable.combineLatest(article$, me$) .map(arr => { let [article, me] = arr return me.isAdmin || article.author === me.id }) |
示例五:幸福人生
// 工资始终不涨 const salary$ = Observable.interval(100).mapTo(2) // 挣钱是为了买房,买房是为了赚钱 const house$ = new Subject() const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0) const rent$ = Observable.interval(3000) .withLatestFrom(houseCount$) .map(arr => arr[1] * 5) /* 解释一下上面这段代码: 房租由房租周期的定时器触发 然后到房子数量中取最后一个值,也就是当前有多少套房 然后,用房子数量乘以单套房的月租,假设是5 */ // 一买了房,就没现金了…… const income$ = Observable.merge(salary$, rent$); const cash$ = income$ .scan((acc, num) => { const newSum = acc + num const newHouse = Math.floor(newSum / 100) if (newHouse > 0) { house$.next(newHouse) } return newSum % 100 }, 0); ` /* 累积之前的现金流与本次收入 假定房价100,先看看现金够买几套房,能买几套买几套 重新计算买完之后的现金 */ |
Hello RxJS