rxjs

RxJS 入门指引和初步应用

https://zhuanlan.zhihu.com/p/25383159

RxJS提供了各种API来创建数据流:

单值:of, empty, never
多值:from
定时:interval, timer
从事件创建:fromEvent
从Promise创建:fromPromise
自定义创建:create
创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:

改变数据形态:map, mapTo, pluck
过滤一些值:filter, skip, first, last, take
时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
累加:reduce, scan
异常处理:throw, catch, retry, finally
条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
转接:switch
也可以对若干个数据流进行组合:

concat,保持原来的序列顺序连接两个数据流
merge,合并序列
race,预设条件为其中一个数据流完成
forkJoin,预设条件为所有数据流都完成
zip,取各来源数据流最后一个值合并为对象
combineLatest,取各来源数据流最后一个值合并为数组

在RxJS中,存在这么几种东西:

Observable 可观察序列,只出不进
Observer 观察者,只进不出
Subject 可出可进的可观察序列,可作为观察者
ReplaySubject 带回放
Subscription 订阅关系

前三种东西,根据它们数据进出的可能性,可以通俗地理解他们的连接方式,这也就是所谓管道的“形状”,一端密闭一端开头,还是两端开口,都可以用来辅助记忆。

上面提到的Subscription,则是订阅之后形成的一个订阅关系,可以用于取消订阅。

Observable.interval(1000).subscribe(() => {
  this.diff = moment(createAt).fromNow()
})
const timeA$ = Observable.interval(1000)
const timeB$ = timeA$.filter(num => {
    return (num % 2 != 0)
      && (num % 3 != 0)
      && (num % 5 != 0)
      && (num % 7 != 0)
  })

const timeC$ = timeB$.debounceTime(3000)
const timeD$ = timeC$.delay(2000)


A: 0  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15 16 17 18 19 20 21
B:    1                             11    13          17    19
C:          1                                   13                19
D:                1                                   13
const 九阴真经 = '天之道,损有余而补不足'

const 黄蓉$ = new ReplaySubject(Number.MAX_VALUE)
const 郭靖$ = new ReplaySubject(3)

const 读书$ = Observable.from(九阴真经.split(''))

读书$.subscribe(黄蓉$)
读书$.subscribe(郭靖$)
const action$ = new Subject()
const reducer = (state, payload) => {
  // 把payload叠加到state上返回
}

const state$ = action$.scan(reducer)
  .startWith({})
const meAction$ = new Subject()
const meReducer = (state, payload) => {}

const articleAction$ = new Subject()
const articleReducer = (state, payload) => {}

const me$ = meAction$.scan(meReducer).startWith({})
const article$ = articleAction$.scan(articleReducer).startWith({})

const state$ = Observable
  .zip(
    me$,
    article$,
    (me, article) => {me, article}
  )
  
const editable$ = Observable.combineLatest(article$, me$)
  .map(arr => {
    let [article, me] = arr
    return me.isAdmin || article.author === me.id
  })

示例五:幸福人生

// 工资始终不涨
const salary$ = Observable.interval(100).mapTo(2)

// 挣钱是为了买房,买房是为了赚钱
const house$ = new Subject()

const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0)

const rent$ = Observable.interval(3000)
  .withLatestFrom(houseCount$)
  .map(arr => arr[1] * 5)
  
/*
解释一下上面这段代码:

房租由房租周期的定时器触发
然后到房子数量中取最后一个值,也就是当前有多少套房
然后,用房子数量乘以单套房的月租,假设是5
*/

// 一买了房,就没现金了……
const income$ = Observable.merge(salary$, rent$);

const cash$ = income$
  .scan((acc, num) => {
    const newSum = acc + num

    const newHouse = Math.floor(newSum / 100)
    if (newHouse > 0) {
      house$.next(newHouse)
    }

    return newSum % 100
  }, 0);
  
`
/*

累积之前的现金流与本次收入
假定房价100,先看看现金够买几套房,能买几套买几套
重新计算买完之后的现金
*/

Hello RxJS

https://zhuanlan.zhihu.com/p/23331432