RxJS is baked into Angular's environment and is heavily used behind-the-scenes inside Angular. You will notice that when you create your brand new Angular app with ng new newApp
or using Visual Studio’s Angular template, RxJS
is always included as one of the core dependencies.
But first, what is RxJS?
RxJS is a library for composing asynchronous and event-based programs by using observable sequences - RxJS
Since RxJS is baked into Angular we are so used to the fact we need to subscribe
in order for us to receive data from our various services, e.g. HttpClient, NgRx Store or event.
What is a Subscription?
To understand what is a Subscription, we need to first understand several terminologies in RxJS.
Observable
According to RxJS docs, Observable is a representation of any set of values over any amount of time.
Observable
has subscribe()
method, which invokes execution of an Observable
and registers Observer handlers for notifications it will emit.
Subscription
Now that we understand Observable
and subscribe()
method, now we are ready to talk about Subscription. In a nutshell, a Subscription:
- Is a disposable resource, usually the execution of an
Observable
(allowing value to be streamed out of observable) - Has one important method,
unsubscribe()
, which disposes of the resource held by the Subscription
Sample use case
You have a Web API available that provides endpoints for adding, editing, and retrieving “Todo” Items. Now you want to display these “Todo” items in an application.
In order to get all the data, we have to:
- Send an http request
- Subscribe to the Observable using
subscribe()
with a fat arrow function that will eventually receive the todo item payload - Inside our fat arrow function, we can assign the incoming todo item to a local variable (which will/can trigger a UI update)
this.httpClient
.get('https://jsonplaceholder.typicode.com/todos')
.subscribe((todos) => {
...
});
It would be nice always to have the latest data. We can continually fetch the new data by polling the endpoint every 10 seconds.
interval(10000).subscribe(() => {
this.httpClient
.get('https://jsonplaceholder.typicode.com/todos')
.subscribe((todos) => {
...
});
});
However, little did we know that our code above was doing more than we intended. We somehow ended up bombarding the server with request after a few navigations back and forth. It went from fetching once every 10 seconds to once every a few seconds, and sometimes double fetching occurred.
Up to this point, we now realised that even if the component gets destroyed, the subscription will remain until we told them otherwise.
Solutions
1. Unsubscribe all Subscriptions
I think this sounds like the most sensible way when we think about the subscription.
Of course, all subscription needs to be unsubscribed for it to be stopped.
Unsubscribe single subscription
ngOnInit() {
// Assign the subscription to a variable
this.subscription = interval(10000).subscribe(() => {
...
});
}
ngOnDestroy() {
// Manually unsubscribe subscription on ngDestroy life-cycle
this.subscription.unsubscribe();
}
Unsubscribe multiple subscriptions in an array
ngOnInit() {
// Push every subscriptions to an array of subscriptions
this.subscriptions.push(
interval(10000).subscribe(() => {
...
})
);
}
ngOnDestroy() {
// Loop on every subscriptions and unsubscribe
this.subscriptions.forEach((subscription) => {
subscription.unsubscribe();
});
}
2. Completing Subscription using RxJS pipe
Before we head in to RxJS pipe, let us first understand what a pipe
is. It is not the same as pipe with the symbol of |
. Pipe in RxJS is a chain of Pipeable Operator, which is a pure function that takes an Observable as an input and emit a new Observable as the output (read more).
There is various way to complete RxJS through pipe. I'll share my top three operators.
take()
take
operator lives up to its name. It will only take a specific number of value passed through and then unsubscribe itself.
ngOnInit() {
interval(10000)
// Take 10 values then unsubscribe the stream
.pipe(take(10))
.subscribe((x) => {
...
});
}
takeWhile()
takeWhile
operator will unsubscribe the Observable stream when the function call inside takeWhile
returns false.
private _componentExist = true;
ngOnInit() {
interval(10000)
// takeWhile pipe will accept stream as long as the value returns true.
// When the value is false, it will terminate the stream
.pipe(takeWhile(() => this._componentExist))
.subscribe(() => {
...
});
}
ngOnDestroy() {
this._componentExist = false;
}
Do notice that if the Observable
stream does not emit any new value, the subscription will never be terminated, since the takeWhile
operator requires a stream to pass through to terminate itself.
takeUntil()
takeUntil
will accept any streamed values until an Observable
function in the takeUntil
emits a value. You might want to use this approach when the two approaches above do not suit your use-case.
private ngDestroy$ = new Subject();
ngOnInit() {
interval(10000)
.pipe(takeUntil(this.ngDestroy$))
.subscribe((x) => {
...
});
}
ngOnDestroy() {
// Emit value to make all takeUntil gets triggered
this.ngDestroy$.next();
// Then complete its own subscription to make sure there is no loose end
this.ngDestroy$.complete();
}
3. Use Angular async pipe
Angular comes with an amazing async
pipe, which allows an observable
value to be streamed directly into the html template. The benefit of this approach is that you don’t have to manage anything. Angular will manage it for you, and it will follow the component's life-cycle.
TS:
todos$: Observable<Todo[]>;
ngOnInit() {
this.todos$ = interval(10000).pipe(
...
);
}
HTML:
<ul>
<li *ngFor="let todo of (todos$ | async)">
{{ todo | json }}
</li>
</ul>
Summary
Make sure you terminate all RxJS subscriptions. You have seen that in a small scale application, an unterminated subscription can cause some unwanted effect. Imagine what could happen in an enterprise app where there are so many subscriptions happening.
We've learnt that in Angular, you can terminate subscription either by:
- Unsubscribe all subscriptions manually
- Completing Subscription using RxJS pipe:
a.take
operator
b.takeWhile
operator
c.takeUntil
operator - Use Angular
async
pipe and let Angular do it for you
Each of them has their strong points. You need to know when to use it according to your own need.
Personally, I would recommend using async
when the data stream is only being used in the HTML template and use takeUntil
if the data is being used in the component. You definitely can mix the usage according to your circumstance.
Now we can all have a fun time with subscriptions!