RxJS是一個(gè)強(qiáng)大的Reactive編程庫(kù),提供了強(qiáng)大的數(shù)據(jù)流組合與控制能力,但是其學(xué)習(xí)門檻一直很高,本次分享期望從一些特別的角度解讀它在業(yè)務(wù)中的使用,而不是從API角度去講解。
RxJS簡(jiǎn)介
通常,對(duì)RxJS的解釋會(huì)是這么一些東西,我們來(lái)分別看看它們的含義是什么。
Reactive
Lodash for events
Observable
Stream-based
什么是Reactive呢,一個(gè)比較直觀的對(duì)比是這樣的:
比如說(shuō),abc三個(gè)變量之間存在加法關(guān)系:
a = b + c
在傳統(tǒng)方式下,這是一種一次性的賦值過(guò)程,調(diào)用一次就結(jié)束了,后面b和c再改變,a也不會(huì)變了。
而在Reactive的理念中,我們定義的不是一次性賦值過(guò)程,而是可重復(fù)的賦值過(guò)程,或者說(shuō)是變量之間的關(guān)系:
a: = b + c
定義出這種關(guān)系之后,每次b或者c產(chǎn)生改變,這個(gè)表達(dá)式都會(huì)被重新計(jì)算。不同的庫(kù)或者語(yǔ)言的實(shí)現(xiàn)機(jī)制可能不同,寫法也不完全一樣,但理念是相通的,都是描述出數(shù)據(jù)之間的聯(lián)動(dòng)關(guān)系。
在前端,我們通常有這么一些方式來(lái)處理異步的東西:
回調(diào)
事件
Promise
Generator
其中,存在兩種處理問題的方式,因?yàn)樾枨笠彩莾煞N:
分發(fā)
流程
在處理分發(fā)的需求的時(shí)候,回調(diào)、事件或者類似訂閱發(fā)布這種模式是比較合適的;而在處理流程性質(zhì)的需求時(shí),Promise和Generator比較合適。
在前端,尤其交互很復(fù)雜的系統(tǒng)中,RxJS其實(shí)是要比Generator有優(yōu)勢(shì)的,因?yàn)槌R姷拿糠N客戶端開發(fā)都是基于事件編程的,對(duì)于事件的處理會(huì)非常多,而一旦系統(tǒng)中大量出現(xiàn)一個(gè)事件要修改視圖的多個(gè)部分(狀態(tài)樹的多個(gè)位置),分發(fā)關(guān)系就更多了。
RxJS的優(yōu)勢(shì)在于結(jié)合了兩種模式,它的每個(gè)Observable上都能夠訂閱,而Observable之間的關(guān)系,則能夠體現(xiàn)流程(注意,RxJS里面的流程的控制和處理,其直觀性略強(qiáng)于Promise,但弱于Generator)。
我們可以把一切輸入都當(dāng)做數(shù)據(jù)流來(lái)處理,比如說(shuō):
用戶操作
網(wǎng)絡(luò)響應(yīng)
定時(shí)器
Worker
RxJS提供了各種API來(lái)創(chuàng)建數(shù)據(jù)流:
單值:of, empty, never
多值:from
定時(shí):interval, timer
從事件創(chuàng)建:fromEvent
從Promise創(chuàng)建:fromPromise
自定義創(chuàng)建:create
創(chuàng)建出來(lái)的數(shù)據(jù)流是一種可觀察的序列,可以被訂閱,也可以被用來(lái)做一些轉(zhuǎn)換操作,比如:
改變數(shù)據(jù)形態(tài):map, mapTo, pluck
過(guò)濾一些值:filter, skip, first, last, take
時(shí)間軸上的操作:delay, timeout, throttle, debounce, audit, bufferTime
累加:reduce, scan
異常處理:throw, catch, retry, finally
條件執(zhí)行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
轉(zhuǎn)接:switch
也可以對(duì)若干個(gè)數(shù)據(jù)流進(jìn)行組合:
concat,保持原來(lái)的序列順序連接兩個(gè)數(shù)據(jù)流
merge,合并序列
race,預(yù)設(shè)條件為其中一個(gè)數(shù)據(jù)流完成
forkJoin,預(yù)設(shè)條件為所有數(shù)據(jù)流都完成
zip,取各來(lái)源數(shù)據(jù)流最后一個(gè)值合并為對(duì)象
combineLatest,取各來(lái)源數(shù)據(jù)流最后一個(gè)值合并為數(shù)組
這時(shí)候回頭看,其實(shí)RxJS在事件處理的路上已經(jīng)走得太遠(yuǎn)了,從事件到流,它被稱為lodash for events,倒不如說(shuō)是lodash for stream更貼切,它提供的這些操作符也確實(shí)可以跟lodash媲美。
數(shù)據(jù)流這個(gè)詞,很多時(shí)候,是從data-flow翻譯過(guò)來(lái)的,但flow跟stream是不一樣的,我的理解是:flow只關(guān)注一個(gè)大致方向,而stream是受到更嚴(yán)格約束的,它更像是在無(wú)形的管道里面流動(dòng)。
那么,數(shù)據(jù)的管道是什么形狀的?
在RxJS中,存在這么幾種東西:
Observable 可觀察序列,只出不進(jìn)
Observer 觀察者,只進(jìn)不出
Subject 可出可進(jìn)的可觀察序列,可作為觀察者
ReplaySubject 帶回放
Subscription 訂閱關(guān)系
前三種東西,根據(jù)它們數(shù)據(jù)進(jìn)出的可能性,可以通俗地理解他們的連接方式,這也就是所謂管道的“形狀”,一端密閉一端開頭,還是兩端開口,都可以用來(lái)輔助記憶。
上面提到的Subscription,則是訂閱之后形成的一個(gè)訂閱關(guān)系,可以用于取消訂閱。
下面,我們通過(guò)一些示例來(lái)大致了解一下RxJS所提供的能力,以及用它進(jìn)行開發(fā)所需要的思路轉(zhuǎn)換。
示例一:簡(jiǎn)單的訂閱
很多時(shí)候,我們會(huì)有一些顯示時(shí)間的場(chǎng)景,比如在頁(yè)面下添加評(píng)論,評(píng)論列表中顯示了它們分別是什么時(shí)間創(chuàng)建的,為了含義更清晰,可能我們會(huì)引入moment這樣的庫(kù),把這個(gè)時(shí)間轉(zhuǎn)換為與當(dāng)前時(shí)間的距離:
const diff = moment(createAt).fromNow()
這樣,顯示的時(shí)間就是:一分鐘內(nèi),昨天,上個(gè)月這樣的字樣。
但我們注意到,引入這個(gè)轉(zhuǎn)換是為了增強(qiáng)體驗(yàn),而如果某個(gè)用戶停留在當(dāng)前視圖時(shí)間太長(zhǎng),它的這些信息會(huì)變得不準(zhǔn)確,比如說(shuō),用戶停留了一個(gè)小時(shí),而它看到的信息還顯示:5分鐘之前發(fā)表了評(píng)論,實(shí)際時(shí)間是一個(gè)小時(shí)零5分鐘以前的事了。
從這個(gè)角度看,我們做這個(gè)體驗(yàn)增強(qiáng)的事情只做了一半,不準(zhǔn)確的信息是不能算作增強(qiáng)體驗(yàn)的。
在沒有RxJS的情況下,我們可能會(huì)通過(guò)一個(gè)定時(shí)器來(lái)做這件事,比如在組件內(nèi)部:
tick() {
this.diff = moment(createAt).fromNow()
setTimeout(tick.bind(this), 1000)
}
但組件并不一定只有一份實(shí)例,這樣,整個(gè)界面上可能就有很多定時(shí)器在同時(shí)跑,這是一種浪費(fèi)。如果要做優(yōu)化,可以把定時(shí)器做成一種服務(wù),把業(yè)務(wù)上需要周期執(zhí)行的東西放進(jìn)去,當(dāng)作定時(shí)任務(wù)來(lái)跑。
如果使用RxJS,可以很容易做到這件事:
Observable.interval(1000).subscribe(() => {
this.diff = moment(createAt).fromNow()
})
示例二:對(duì)時(shí)間軸的操縱
RxJS一個(gè)很強(qiáng)大的特點(diǎn)是,它以流的方式來(lái)對(duì)待數(shù)據(jù),因此,可以用一些操作符對(duì)整個(gè)流上所有的數(shù)據(jù)進(jìn)行延時(shí)、取樣、調(diào)整密集度等等。
const timeA$ = Observable.interval(1000)
const timeB$ = timeA$.filter(num => {
return (num % 2 != 0)
&& (num % 3 != 0)
&& (num % 5 != 0)
&& (num % 7 != 0)
})
const timeC$ = timeB$.debounceTime(3000)
const timeD$ = timeC$.delay(2000)
示例代碼中,我們創(chuàng)建了四個(gè)流:
A是由定時(shí)器產(chǎn)生的,每秒一個(gè)值;
B從A里面過(guò)濾掉了一些;
C在B的基礎(chǔ)上,對(duì)每?jī)蓚€(gè)間距在3秒之內(nèi)的值進(jìn)行了處理,只留下后一個(gè)值;
D把C的結(jié)果整體向后平移了2秒。
所以結(jié)果大致如下:
A: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
B: 1 11 13 17 19
C: 1 13 19
D: 1 13
示例三:我們來(lái)晚了
RxJS還提供了BehaviourSubject和ReplaySubject這樣的東西,用于記錄數(shù)據(jù)流上一些比較重要的信息,讓那些“我們來(lái)晚了”的訂閱者們回放之前錯(cuò)過(guò)的一切。
ReplaySubject可以指定保留的值的個(gè)數(shù),超過(guò)的部分會(huì)被丟棄。
最近新版《射雕英雄傳》比較火,我們來(lái)用代碼描述其中一個(gè)場(chǎng)景。
郭靖和黃蓉一起背書,黃蓉記憶力很好,看了什么,就全部記得;而郭靖屬魚的,記憶只有七秒,始終只記得背誦的最后三個(gè)字,兩人一起背誦《九陰真經(jīng)》。
代碼實(shí)現(xiàn)如下:
const 九陰真經(jīng) = '天之道,損有余而補(bǔ)不足'
const 黃蓉$ = new ReplaySubject(Number.MAX_VALUE)
const 郭靖$ = new ReplaySubject(3)
const 讀書$ = Observable.from(九陰真經(jīng).split(''))
讀書$.subscribe(黃蓉$)
讀書$.subscribe(郭靖$)
執(zhí)行之后,我們就可以看到,黃蓉背出了所有字,郭靖只記得“補(bǔ)不足”三個(gè)字。
示例四:自動(dòng)更新的狀態(tài)樹
熟悉Redux的人應(yīng)該會(huì)對(duì)這樣一套理念不陌生:
當(dāng)前視圖狀態(tài) := 之前的狀態(tài) + 本次修改的部分
從一個(gè)應(yīng)用啟動(dòng)之后,整個(gè)全局狀態(tài)的變化,就等于初始的狀態(tài)疊加了之后所有action導(dǎo)致的狀態(tài)修改結(jié)果。
所以這就是一個(gè)典型的reduce操作。在RxJS里面,有一個(gè)scan操作符可以用來(lái)表達(dá)這個(gè)含義,比如說(shuō),我們可以表達(dá)這樣一個(gè)東西:
const action$ = new Subject()
const reducer = (state, payload) => {
// 把payload疊加到state上返回
}
const state$ = action$.scan(reducer)
.startWith({})
只需往這個(gè)action$里面推action,就能夠在state$上獲取出當(dāng)前狀態(tài)。
在Redux里面,會(huì)有一個(gè)東西叫combineReducer,在state比較大的時(shí)候,用不同的reducer修改state的不同的分支,然后合并。如果使用RxJS,也可以很容易表達(dá)出來(lái):
const meAction$ = new Subject()
const meReducer = (state, payload) => {}
const articleAction$ = new Subject()
const articleReducer = (state, payload) => {}
const me$ = meAction$.scan(meReducer).startWith({})
const article$ = articleAction$.scan(articleReducer).startWith({})
const state$ = Observable
.zip(
me$,
article$,
(me, article) => {me, article}
)
借助這樣的機(jī)制,我們實(shí)現(xiàn)了Redux類似的功能,社區(qū)里面也有基于RxJS實(shí)現(xiàn)的Redux-Observable這樣的Redux中間件。
注意,我們這里的代碼中,并未使用dispatch action這樣的方式去嚴(yán)格模擬Redux。
再深入考慮,在比較復(fù)雜的場(chǎng)景下,reducer其實(shí)很復(fù)雜。比如說(shuō),視圖上發(fā)起一個(gè)操作,會(huì)需要修改視圖的好多地方,因此也就是要修改全局狀態(tài)樹的不同位置。
在這樣的場(chǎng)景中,從視圖發(fā)起的某個(gè)action,要么調(diào)用一個(gè)很復(fù)雜的reducer去到處改數(shù)據(jù),要么再次發(fā)起多個(gè)action,讓很多個(gè)reducer各自改自己的數(shù)據(jù)。
前者的問題是,代碼耦合太嚴(yán)重;后者的問題是,整個(gè)流程太難追蹤,比如說(shuō),某一塊狀態(tài),想要追蹤到自己是被從哪里發(fā)起的修改所改變的,是非常困難的事情。
如果我們能夠把Observable上面的同步修改過(guò)程視為reducer,就可以從另外一些角度大幅簡(jiǎn)化代碼,并且讓聯(lián)動(dòng)邏輯清晰化。例如,如果我們想描述一篇文章的編輯權(quán)限:
const editable$ = Observable.combineLatest(article$, me$)
.map(arr => {
let [article, me] = arr
return me.isAdmin || article.author === me.id
})
這段代碼的實(shí)質(zhì)是什么?其實(shí)本質(zhì)上還是reducer,表達(dá)的是數(shù)據(jù)的合并與轉(zhuǎn)換過(guò)程,而且是同步的。我們可以把a(bǔ)rticle和me的變更reduce到article$和me$里,由它們派發(fā)隱式的action去推動(dòng)editable計(jì)算新值。
更詳細(xì)探索的可以參見之前的這篇文章:復(fù)雜單頁(yè)應(yīng)用的數(shù)據(jù)層設(shè)計(jì)。
示例五:幸福人生
人生是什么樣子的呢?
著名央視主持人白巖松曾經(jīng)說(shuō)過(guò):
賺錢是為了買房,買房是為了賺錢。
這兩句話聽上去很悲哀,卻很符合社會(huì)現(xiàn)實(shí)。(不要在意是不是白巖松說(shuō)的啦,不是他就是魯迅,要么就是莎士比亞)
作為程序員,我們可以嘗試想想如何用代碼把它表達(dá)出來(lái)。
如果用命令式編程的理念來(lái)描述這段邏輯,是不太好下手的,因?yàn)樗雌饋?lái)像個(gè)死循環(huán),可是人生不就是一天一天的死循環(huán)嗎,這個(gè)復(fù)雜的世界,誰(shuí)是自變量,誰(shuí)是因變量?
死循環(huán)之所以很難用代碼表達(dá),是因?yàn)槟悴恢老榷x哪個(gè)變量,如果變量的依賴關(guān)系形成了閉環(huán),就總有一段定義不起來(lái)。
但是,在RxJS這么一套東西中,我們可以很容易把這套關(guān)系描述出來(lái)。前面說(shuō)過(guò),基于RxJS編程,就好像是在組裝管道,依賴關(guān)系其實(shí)是定義在管道上,而不是在數(shù)據(jù)上。所以,不存在命令式的那些問題,只要管道能夠接起來(lái),再放進(jìn)去數(shù)據(jù)就可以了。所以,我們可以先定義管道之間的依賴關(guān)系,
首先,從這段話中尋找一些變量,得到如下結(jié)果:
錢
房
然后,我們來(lái)探索它們各自的來(lái)源。
錢從哪里來(lái)?
出租房子。
房子從哪里來(lái)?
錢掙夠了就買。
聽上去還是死循環(huán)啊?
我們接著分析:
錢是只有一個(gè)來(lái)源嗎?
不是,原始積累肯定不是房租,我們假定那是工資。
所以,收入是有工資和房租兩個(gè)部分組成。 房子是只有一個(gè)來(lái)源嗎?
對(duì),我們不是貪官,房子都是用錢買的。
好,現(xiàn)在我們有四個(gè)變量了:
錢
房
工資
房租
我們嘗試定義這些變量之間的關(guān)系:
工資 := 定時(shí)取值的常量
房租 := 定時(shí)取值的變量,與房子數(shù)量成正比
錢 := 工資 + 房租
房 := 錢.map(夠了就買)
調(diào)整這些變量的定義順序,凡是不依賴別人的,一律提到最前面實(shí)現(xiàn)。尷尬地發(fā)現(xiàn),這四個(gè)變量里,只有工資是一直不變的,先提前。
const salary$ = Observable.interval(100).mapTo(2)
剩下的,都是依賴別人的,而且,沒有哪個(gè)東西是只依賴已定義的變量,在存在業(yè)務(wù)上的循環(huán)依賴的時(shí)候,就會(huì)發(fā)生這樣的情況。在這種情況下,我們可以從中找出被依賴最少的變量,聲明一個(gè)Subject用于占位,比如這里的房子。
const house$ = new Subject()
接下來(lái)再看,以上幾個(gè)變量中,有哪個(gè)可以跟著確定?是房租,所以,我們可以得到房租與房子數(shù)量的關(guān)系表達(dá)式,注意,以上的salary$、house$,表達(dá)的都是單次增加的值,不代表總的值,但是,算房租是要用總的房子數(shù)量來(lái)算的,所以,我們還需要先表達(dá)出總的房子數(shù)量:
const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0)
然后,可以得到房租的表達(dá)式:
const rent$ = Observable.interval(3000)
.withLatestFrom(houseCount$)
.map(arr => arr[1] * 5)
解釋一下上面這段代碼:
房租由房租周期的定時(shí)器觸發(fā);
然后到房子數(shù)量中取最后一個(gè)值,也就是當(dāng)前有多少套房;
然后,用房子數(shù)量乘以單套房的月租,假設(shè)是5;
房租定義出來(lái)了之后,錢就可以被定義了:
const income$ = Observable.merge(salary$, rent$)
注意,income$所代表的含義是,所有的單次收入,包含工資和房租。
到目前為止,我們還有一個(gè)東西沒有被定義,那就是房子。如何從收入轉(zhuǎn)化為房子呢?為了示例簡(jiǎn)單,我們把它們的關(guān)系定義為:
一旦現(xiàn)金流夠買房,就去買。
所以,我們需要定義現(xiàn)金流與房子數(shù)量的關(guān)系:
const cash$ = income$
.scan((acc, num) => {
const newSum = acc + num
const newHouse = Math.floor(newSum / 100)
if (newHouse > 0) {
house$.next(newHouse)
}
return newSum % 100
}, 0)
這段邏輯的含義是:
累積之前的現(xiàn)金流與本次收入;
假定房?jī)r(jià)100,先看看現(xiàn)金夠買幾套房,能買幾套買幾套;
重新計(jì)算買完之后的現(xiàn)金。
總結(jié)一下,這么一段代碼,就表達(dá)清楚了我們所有的業(yè)務(wù)需求:
// 掙錢是為了買房,買房是為了賺錢
const house$ = new Subject()
const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0)
// 工資始終不漲
const salary$ = Observable.interval(100).mapTo(2)
const rent$ = Observable.interval(3000)
.withLatestFrom(houseCount$)
.map(arr => arr[1] * 5)
// 一買了房,就沒現(xiàn)金了……
const income$ = Observable.merge(salary$, rent$)
const cash$ = income$
.scan((acc, num) => {
const newSum = acc + num
const newHouse = Math.floor(newSum / 100)
if (newHouse > 0) {
house$.next(newHouse)
}
return newSum % 100
}, 0)
// houseCount$.subscribe(num => console.log(`houseCount: ${num}`))
// cash$.subscribe(num => console.log(`cash: ${num}`))
這段代碼所表達(dá)出來(lái)的業(yè)務(wù)關(guān)系如圖:
工資周期 ———> 工資
↓
房租周期 ———> 租金 ———> 收入 ———> 現(xiàn)金
↑ ↓
房子數(shù)量 <——— 新購(gòu)房
注意:在這個(gè)例子中,house$的處理方式與眾不同,因?yàn)槲覀兊臉I(yè)務(wù)邏輯是環(huán)形依賴,至少要有一個(gè)東西先從里面拿出來(lái)占位,后續(xù)再處理,否則沒有辦法定義整條鏈路。
小結(jié)
本篇通過(guò)一些簡(jiǎn)單例子介紹了RxJS的使用場(chǎng)景,可以用這么一句話來(lái)描述它:
其文簡(jiǎn),其意博,其理奧,其趣深
RxJS提供大量的操作符,用于處理不同的業(yè)務(wù)需求。對(duì)于同一個(gè)場(chǎng)景來(lái)說(shuō),可能實(shí)現(xiàn)方式會(huì)有很多種,需要在寫代碼之前仔細(xì)斟酌。由于RxJS的抽象程度很高,所以,可以用很簡(jiǎn)短代碼表達(dá)很復(fù)雜的含義,這對(duì)開發(fā)人員的要求也會(huì)比較高,需要有比較強(qiáng)的歸納能力。
本文是入職螞蟻金服之后,第一次內(nèi)部分享,科普為主,后面可能會(huì)逐步作一些深入的探討。
螞蟻的大部分業(yè)務(wù)系統(tǒng)前端不太適合用RxJS,大部分是中后臺(tái)CRUD系統(tǒng),因?yàn)閮蓚€(gè)原因:整體性、實(shí)時(shí)性的要求不高。
什么是整體性?這是一種系統(tǒng)設(shè)計(jì)的理念,系統(tǒng)中的很多業(yè)務(wù)模塊不是孤立的,比如說(shuō),從展示上,GUI與命令行的差異在于什么?在于數(shù)據(jù)的冗余展示。我們可以把同一份業(yè)務(wù)數(shù)據(jù)以不同形態(tài)展示在不同視圖上,甚至在PC端,由于屏幕大,可以允許同一份數(shù)據(jù)以不同形態(tài)同時(shí)展現(xiàn),這時(shí)候,為了整體協(xié)調(diào),對(duì)此數(shù)據(jù)的更新就會(huì)要產(chǎn)生很多分發(fā)和聯(lián)動(dòng)關(guān)系。
什么是實(shí)時(shí)性?這個(gè)其實(shí)有多個(gè)含義,一個(gè)比較重要的因素是服務(wù)端是否會(huì)主動(dòng)向推送一些業(yè)務(wù)更新信息,如果用得比較多,也會(huì)產(chǎn)生不少的分發(fā)關(guān)系。
在分發(fā)和聯(lián)動(dòng)關(guān)系多的時(shí)候,RxJS才能更加體現(xiàn)出它比Generator、Promise的優(yōu)勢(shì)。
咨詢熱線
010-85377344
135-21581588
微信客服
QQ客服
3026106565 點(diǎn)擊咨詢