查了一下 rxjs文件,發現rxjs 上除了自己平常使用的建立Obsservable方法外,還有提供其他沒用過的建立方式,這邊看的同時也做一個記錄。
依照傳入的參數,逐一 emit value
const source$ = of(1, 2, 3, 4, 5);
console.log('Trying to subscribe.');
source$.subscribe(val => console.log(`Receive value ${val}`));
console.log('Subscribed.');
依照傳入的陣列,逐一 emit 陣列內容,從 rxjs6之後, from 可以傳入 promise,並移除了 fromPromise方法。
const source$ = from([1, 2, 3, 4, 5]);
this.textLogSrv.addLogs('Trying to subscribe.');
source$.subscribe(val => this.textLogSrv.addLogs(`Receive value ${val}`));
this.textLogSrv.addLogs('Subscribed.');
除了上面用法,透過 from(‘string’)方式使用,結果會逐一傳回 string中的每一個字元。
Creates an Observable that emits events of a specific type coming from the given event target.
Javascript event以Observable方式處理,在Angular中除了以 ViewChild方式實做外,也可以傳入javascript DOM element。
Code:
export class DemoComponent implements AfterViewInit {
@ViewChild('myInput', {static: true}) myInput: ElementRef;
constructor(private textLogSrv: TextLogService) { }
ngAfterViewInit(): void {
/**
* From ViewChild
*/
const click$ = fromEvent(this.myInput.nativeElement, 'click');
const focus$ = fromEvent(this.myInput.nativeElement, 'focus');
const blur$ = fromEvent(this.myInput.nativeElement, 'blur');
click$.subscribe((val:MouseEvent) => this.textLogSrv.addLogs(`Click event ${val}`));
focus$.subscribe((val:FocusEvent) => this.textLogSrv.addLogs(`Focus event ${val}`));
blur$.subscribe((val: FocusEvent) => this.textLogSrv.addLogs(`Blur event ${val}`));
/**
* From native dom element
*/
const clickFromDom$ = fromEvent(document.querySelector('input[type=text]'), 'click');
clickFromDom$.subscribe((val:MouseEvent) => this.textLogSrv.addLogs(`From DOM click event ${val}`));
}
}
Template:
<p>
<input #myInput type="text"/>
<text-log></text-log>
</p>
An Observable that emits no items to the Observer and never completes.
訂閱後就不會 complete的Observable,因為這特性,所以一定要手動將 observable做取消訂閱的動作,通常不會獨自使用,會搭配其他operators或是用來測試程式。
NEVER
.subscribe(
val => this.textLogSrv.addLogs('Value emit'),
noop,
()=> this.textLogSrv.addLogs('Completed')
);
訂閱後馬上結束的Observable。
EMPTY
.subscribe(
val => this.textLogSrv.addLogs('Value emit'),
noop,
()=> this.textLogSrv.addLogs('Completed')
);
建立一個新的Observable,並進入 error callback。
of(1,2,3,4,5)
.pipe(
switchMap( val => {
if(val === 3){
// throwError creates new observable
return throwError('I dont like number 2.');
}
return of(val);
})
)
.subscribe(
val => this.textLogSrv.addLogs(`Value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Completed')
);
}
[Creates an Observable that emits sequential numbers every specified interval of time, on a specified
SchedulerLike
.]
固定時間emit value之Observable,依序回傳 0, 1, 2, 3 …,因為不會自動停止,通常會搭配其他operator使用,或是手動取消訂閱。
interval(2000)
.subscribe(
val => this.textLogSrv.addLogs(`Value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Completed')
);
與interval相似,但可以指定第一次在過多久之後才執行
// 過三秒後執行,之後每一秒送出一個value
timer(3000, 1000)
.subscribe(
val => this.textLogSrv.addLogs(`Timer1 value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Timer1 completed')
);
// 一秒後送出 0後便 complete
timer(1000)
.subscribe(
val => this.textLogSrv.addLogs(`Timer2 value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Timer2 completed')
);
// 可以指定 Date作為起始時間
timer(new Date(new Date().getTime()+ 5000))
.subscribe(
val => this.textLogSrv.addLogs(`Timer3 value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Timer3 completed')
);
// 用法如同timer1,但改以Date作為起始時間
timer(new Date(new Date().getTime()+ 10000),500)
.subscribe(
val => this.textLogSrv.addLogs(`Timer4 value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Timer4 completed')
);
Creates an Observable that emits a sequence of numbers within a specified range.
可以指定從某個數值開始,產生 n 個數值
// starts from 3 and generate 20 values
range(3, 20)
.subscribe(
val => this.textLogSrv.addLogs(`value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('completed')
);
this.textLogSrv.addLogs('Subscribe completed');
官方文件沒有文字說明…
類似 for loop
generate(0, x=> x< 10, x=> x+2)
.subscribe(
val => this.textLogSrv.addLogs(`Timer1 value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('Timer1 completed')
);
this.textLogSrv.addLogs('Subscribe completed');
Rxjs也有提供 ajax請求,用的感覺有 $.ajax()的味道。
下面是RXJS官網範例:
ajax({
url: 'https://httpbin.org/delay/2',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'rxjs-custom-header': 'Rxjs'
},
body: {
rxjs: 'Hello World!'
}
}).pipe(
map(response => this.textLogSrv.addLogs(JSON.stringify(response))),
catchError(error => {
this.textLogSrv.addLogs('error: ', error);
return of(error);
})
).subscribe();
Creates a new cold Observable by calling the Observable constructor
自己手寫一個 Observable。
Observable.create中傳入之參數為一個 Observer,可以看Observer原始碼:
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
Observer 就是當程式在訂閱 Observable時傳入的三個 callback function。
這邊可以看Observable.subscribe方法定義:
subscribe(observer?: PartialObserver<T>): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
/** @deprecated Use an observer instead of an error callback */
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription;
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
所以當程式在做訂閱時,傳入的內容就是Observer需要的屬性:
// 參數依序會是 next、error與complete
of(1,2,3).subscribe(
val => {},
err => {},
() => {}
);
也可以用標準Observer方式傳入
const myObserver: Observer<any> = {
next: val => console.log(val),
error: err => console.log(err),
complete: () => console.log('complete')
}
of(1,2,3).subscribe(myObserver);
在知道這點之後,就可以開始說明 observable.create方法,直接用程式碼搭配說明:
// 建立Observable
const source$ = Observable.create( (observer: Observer<number>) => {
// 發送第一個next
observer.next(100);
// 之後每一秒發送一個value,執行10次後結束
interval(1000).pipe(
take(10)
).subscribe(
val => observer.next(val *2),
noop,
() => observer.complete()
)
});
source$.subscribe(
val => this.textLogSrv.addLogs(`value emit ${val}`),
err => this.textLogSrv.addLogs(`Got an error: ${err}`),
()=> this.textLogSrv.addLogs('completed')
);
this.textLogSrv.addLogs('Subscribe completed');
而上面source$.subscribe中,傳入之function:
val =>this.textLogSrv.addLogs(`value emit ${val}`)
即為Observable.create( observer) 裡面 observer.next function,所以當程式執行到 observer.next(100)時,實際上就是把 100 傳入上面 function中執行, error與 complete也是同樣的原理。
如果Observable執行完,建議要調用 complete方法告訴訂閱者結束,否則會跟NEVER一樣永遠不會停止,如果忘記取消訂閱,可能會造成其他問題。
不管以什麼方式產生Observable stream,都只會在開始訂閱的當下才開始執行。defer 比較常見是用在 promise,用過promise都知道,promise在宣告的當下就會開始執行,不像Observable需要等訂閱時才執行,所以下面的程式碼使用 from 將 promise轉為 observable,等 10秒後才訂閱,這時會發現在observable被訂閱前,promise已經將請求發出去。
const req$ = from(fetch('https://httpbin.org/delay/2',{
cache: 'no-cache',
headers: {
'user-agent': 'Mozilla/4.0 MDN Example',
'content-type': 'application/json'
},
method: 'POST',
mode: 'cors',
}));
setTimeout(()=>{
this.textLogSrv.addLogs('req$ start subscribing')
req$
.subscribe(
val=> this.textLogSrv.addLogs(`req$ value: ${val.url}`),
err=> this.textLogSrv.addLogs(`req$ return error ${err}`),
()=> this.textLogSrv.addLogs(`req$ complete`));
this.textLogSrv.addLogs('req$ subscribed')
}, 10000);
要避免這情況,就會需要以defer方式建立 Observable,這方式可以讓原本立刻執行的 promise,等到Observable被訂閱的當下才執行
// send request when subscribeing to observable
const deferReq$ = defer(()=>fetch('https://httpbin.org/delay/2',{
cache: 'no-cache',
headers: {
'user-agent': 'Mozilla/4.0 MDN Example',
'content-type': 'application/json'
},
method: 'POST',
mode: 'cors',
}));
setTimeout(()=>{
this.textLogSrv.addLogs('deferReq$ start subscribing')
deferReq$
.subscribe(
val=> this.textLogSrv.addLogs(`deferReq$ value: ${val.url}`),
err=> this.textLogSrv.addLogs(`deferReq$ return error ${err}`),
()=> this.textLogSrv.addLogs(`deferReq$ complete`));
this.textLogSrv.addLogs('deferReq$ subscribed')
}, 15000);
兩段程式碼請求方式相同,只差在於第二段程式使用了 defer方法來延遲執行。