Observable Operator使用方式。
原本的 do 用法,通常 Observable 會搭配 tap 來執行有 side effect的工作,如Observable emit value後,要改變其他物件的狀態,或是更新UI 畫面等,每一次 Observable emit 一個內容都會進入 tap 中。
如果要透過 async pipe來訂閱 Observable, 搭配 tap 會是不錯的選擇( 純個人意見)。
下列程式碼寫法不同,但是做的事情是相同的
of(1,2,3,4,5).subscribe(
val => this.count1 +=val
);
of(1,2,3,4,5).pipe(
tap(
val => this.count2 += val
)
).subscribe()
const obs:Observer<number> = {
next: val => this.count3+=val,
error: noop,
complete: noop
}
this.count$ = of(1,2,3,4,5).pipe(tap(obs))
of(1,2,3,4,5).pipe(
tap( val => console.log(`receive val ${val}`)),
map( val => val *2),
tap( val => console.log(`revceive val after map: ${val}`))
).subscribe()
Maps each source value (an object) to its specified nested property.
將 Observable資料流中之物件,透過 pluck可以轉為物件中特定屬性內容。
const persons = [{
name: 'Tom',
age: 18
},{
name: 'Jessie',
age: 26
}]
of(...persons).pipe(
pluck('age'),
tap(val => console.log(val))
).subscribe()
將Observable emit 出來的 value 轉為一個常數(constant)
of(1,2,3,4,5).pipe(
mapTo(() => console.log('Hello World')),
tap(val => val())
).subscribe()
練習題:https://stackblitz.com/edit/angular-spv3wd
功能與陣列中的 filter相似,傳入一個 predicate function後,會將符合條件的 observable value過濾出來。
// 過濾大於3的數值,所以只會得到 4與 5
of(1,2,3,4,5).pipe(
filter( val => val >3),
tap(val => console.log(val))
).subscribe()
Returns an Observable that skips the first
count
items emitted by the source Observable.
忽略 Observable送出的 資料中前 N筆資料。
// emit 3, 4, 5
of(1,2,3,4,5).pipe(
skip(2),
tap(val => console.log(val))
).subscribe()
在Observable資料流中,只會emit第一筆資料,或是第一筆符合條件之資料。
of(1,2,3,4,5).pipe(
first(),
tap(val => console.log(val))
).subscribe();
of(1,2,3,4,5).pipe(
first( val => val >4),
tap(val => console.log(val))
).subscribe()
相較於 first取第一個, last則是取最後一筆,或是滿足條件之最後一筆。
// emit 5
of(1,2,3,4,5).pipe(
last(),
tap(val => console.log(val))
).subscribe();
// emit 5
of(1,2,3,4,5).pipe(
last( val => val >4),
tap(val => console.log(val))
).subscribe();
Emits only the first
count
values emitted by the source Observable.
指定要抓取 N筆 Observable中 emit出來之資料,一旦滿足條件後變停止。
// 1, 2
of(1,2,3,4,5).pipe(
take(2),
tap(val => console.log(val))
).subscribe();
Emits only the last
count
values emitted by the source Observable.
指定要抓取 Observable資料中最後 N筆資料。
// 4,5
of(1,2,3,4,5).pipe(
takeLast(2),
tap(val => console.log(val))
).subscribe();
與 filter方法類似,會傳入一個 predicate function 來過濾資料,與 filter不同的是,當遇到第一個不滿足 predicate function時,便會進入 complete。
// emit 0 to 14
range(0, 30).pipe(
takeWhile( val => val < 15),
tap(val => console.log(val))
).subscribe();
Emits the values emitted by the source Observable until a
notifier
Observable emits a value.
讓 Observable持續 emit資料,等到另一個 Observable emit value後便進入 complete。
// after clicking on DOM, interval observable complete
interval(5000).pipe(
takeUntil( fromEvent(document, 'click')),
tap(val => console.log(val))
).subscribe();
如果要透過 takeUntil來取消訂閱 Observable,也需要注意 takeUntil擺放的位置,若沒放在適當的位置,可能還是會因為沒有正確的 unscribe而導致問題。Avoiding takeUntil Leaks,文章中提到必須將 takeUntil放在最後一個 operator,如果 takeUntil operator後還有其他訂閱其他 observable之operator,可能會沒辦法套用到 takeUntil之效果,如 switchMap。
下面的例子中, switchMap中的 Observable將不會在 takeUntil觸發後停止:
interval(1000).pipe(
takeUntil( fromEvent(document, 'click').pipe(
tap(val=> console.log('Click triggered'))
)),
tap(val => console.log(`interval val: ${val}`)),
switchMap(val => interval(100)),
tap(val => console.log(`switchMap val: ${val}:`))
).subscribe();
調整方式是將 takeUntil移到 operators最後一個:
interval(1000).pipe(
tap(val => console.log(`interval val: ${val}`)),
switchMap(val => interval(100)),
tap(val => console.log(`switchMap val: ${val}:`)),
takeUntil( fromEvent(document, 'click').pipe(
tap(val=> console.log('Click triggered'))
)),
).subscribe();
與reduce功能相同,差異在於Observable每做一次就會 emit 一次 value
fromEvent(document, 'click').pipe(
mapTo(1),
scan((acc, val)=> acc+val, 0),
tap(val => console.log(`accumulated val ${val}`))
).subscribe()
同scan之功能,但只有在 reduce 完成後才會 emit value,也因為資料計算完後才會 emit 結果,所以必須確保 Observable事件是會結束的,否則 reduce 在未能確定前面 Observable資料結束前不會將壘加完資料送出。
fromEvent(document, 'click').pipe(
take(5),
mapTo(1),
reduce((acc, val)=> acc+val, 0),
tap(val => console.log(`accumulated val ${val}`)),
).subscribe()
Delays the emission of items from the source Observable by a given timeout or until a given Date.
Observable 執行過程中,如果遇到 delay,則會依據 delay 上設置延遲 N ms emit value到下一個 operator。
delay 參數也可以傳入一個日期參數。
interval(500).pipe(
take(5),
tap(val => console.log(`first val: ${val}`)),
delay(1000),
tap(val => console.log(`second val: ${val}`)),
delay(3000),
tap(val => console.log(`third val: ${val}`)),
).subscribe()
功能與 delay相似,差異在於 delay多久後觸發由另一個 observable觸發。
const delayParams = [1000,6000,3000,5000,500];
interval(500).pipe(
take(5),
tap(val => console.log(`first val: ${val}`)),
delayWhen(val => timer(delayParams[val])),
tap(val => console.log(`second val: ${val} delay ${delayParams[val]}`)),
).subscribe()
Catches errors on the observable to be handled by returning a new observable or throwing an error.
在Observable執行出錯時,如果有使用catchError operator,則會被 catchError抓住,這時可以選擇讓整個 observabl3 retry,或是改傳回一個Observable。若沒有catchError,則會進入 observer的 error function中。
catchError中 return的Observable會自動被訂閱接著執行
catchError後改傳一個新的 observable:
interval(1000).pipe(
map(val => {
if(val === 4){
throw '4 is not good';
}
return val * 2;
}),
catchError(err => of('A','B','C')),
tap(val => console.log(`value: ${val}`)),
).subscribe()
catchError後 retry,主要是根據 catchError中傳入的第二個參數,第二個參數為原本執行的整個 observable:
第二個參數為原本執行的整個 observable
// 因為會一直 retry,故要加上take(20)
interval(1000).pipe(
map(val => {
if(val === 4){
throw '4 is not good';
}
return val * 2;
}),
catchError((err, caught) => caught),
tap(val => console.log(`value: ${val}`)),
take(20)
).subscribe()
在 Observable執行失敗時,會重新將整個 Observable 重新執行 N 次。
interval(1000).pipe(
map(val => {
if(val === 4){
throw '4 is not good';
}
return val * 2;
}),
retry(2),
tap(
val => console.log(`value: ${val}`),
err => console.log(`error: ${err}`)
),
).subscribe()
retry是指定次數, retryWhen則是指定滿足特定條件才retry,如另一個 Observable觸發時重新嘗試。
interval(500).pipe(
map(val => {
if(val === 4){
throw '4 is not good';
}
return val * 2;
}),
retryWhen( (errors: Observable<any>) => {
return errors.pipe(
tap(val => console.log(`waiting for retry: ${val}`)),
delayWhen(() => timer(2000)),
take(5) // 實際上最多 retry 4次
);
}),
tap(
val => console.log(`value: ${val}`),
err => console.log(`error: ${err}`),
()=> console.log('completed')
),
).subscribe()
不管 Observable資料是什麼,都會強迫在原本的 observable emit 出來前,先 emit指定的 value出來。
// emit 11,12,13,0,1
interval(500).pipe(
startWith(11,12,13),
take(5),
tap(val => console.log(`val: ${val}`))
).subscribe()