RxJS is baked into Angular's environment and is heavily used behind the scenes inside Angular. You will notice that when you create your brand new Angular app with ng new newApp
or using Visual Studio’s Angular template, RxJS
is always included as one of the core dependencies.
But first, what is RxJS?
RxJS is a library for composing asynchronous and event-based programs by using observable sequences - RxJS
Since RxJS is baked into Angular we are so used to the fact we need to subscribe
for us to receive data from our various services, e.g. HttpClient, NgRx Store or event.
What is a Subscription?
To understand what a Subscription is, we need to first understand several terminologies in RxJS.
Observable
According to RxJS docs, Observable is a representation of any set of values over any amount of time.Observable
has subscribe()
method, which invokes the execution of an Observable
and registers Observer handlers for notifications it will emit.
Subscription
Now that we understand Observable
and subscribe()
method, now we are ready to talk about Subscription. In a nutshell, a Subscription:
Is a disposable resource, usually the execution of an
Observable
(allowing value to be streamed out of observable)There is one important method,
unsubscribe()
, which disposes of the resource held by the Subscription
Sample use case
You have a Web API available that provides endpoints for adding, editing, and retrieving “Todo” Items. Now you want to display these “Todo” items in an application.
To get all the data, we have to:
Send an HTTP request
Subscribe to the Observable using
subscribe()
with a fat arrow function that will eventually receive the todo item payloadInside our fat arrow function, we can assign the incoming todo item to a local variable (which will/can trigger a UI update)
this.httpClient
.get('https://jsonplaceholder.typicode.com/todos')
.subscribe((todos) => {
...
});
It would be nice always to have the latest data. We can continually fetch the new data by polling the endpoint every 10 seconds.
interval(10000).subscribe(() => {
this.httpClient
.get('https://jsonplaceholder.typicode.com/todos')
.subscribe((todos) => {
...
});
});
However, little did we know that our code above was doing more than we intended. We somehow ended up bombarding the server with request after a few navigations back and forth. It went from fetching once every 10 seconds to once every a few seconds, and sometimes double fetching occurred.
Up to this point, we now realised that even if the component gets destroyed, the subscription will remain until we told them otherwise.
Solutions
1. Unsubscribe all Subscriptions
I think this sounds like the most sensible way when we think about the subscription.
Of course, all subscription needs to be unsubscribed for it to be stopped.
Unsubscribe single subscription
ngOnInit() {
// Assign the subscription to a variable
this.subscription = interval(10000).subscribe(() => {
...
});
}
ngOnDestroy() {
// Manually unsubscribe subscription on ngDestroy life-cycle
this.subscription.unsubscribe();
}
Unsubscribe multiple subscriptions in an array
ngOnInit() {
// Push every subscriptions to an array of subscriptions
this.subscriptions.push(
interval(10000).subscribe(() => {
...
})
);
}
ngOnDestroy() {
// Loop on every subscriptions and unsubscribe
this.subscriptions.forEach((subscription) => {
subscription.unsubscribe();
});
}
2. Completing Subscription using RxJS pipe
Before we head into RxJS pipe, let us first understand what a pipe
is. It is not the same as a pipe with the symbol of |
. Pipe in RxJS is a chain of Pipeable Operators, which is a pure function that takes an Observable as an input and emits a new Observable as the output (read more).
There are various ways to complete RxJS through pipe. I'll share my top three operators.
take()
take
operator lives up to its name. It will only take a number passed through the function and then unsubscribe itself.
ngOnInit() {
interval(10000)
// Take 10 values then unsubscribe the stream
.pipe(take(10))
.subscribe((x) => {
...
});
}
takeWhile()
takeWhile
operator will unsubscribe the Observable stream when the function call inside takeWhile
returns false.
private _componentExist = true;
ngOnInit() {
interval(10000)
// takeWhile pipe will accept stream as long as the value returns true.
// When the value is false, it will terminate the stream
.pipe(takeWhile(() => this._componentExist))
.subscribe(() => {
...
});
}
ngOnDestroy() {
this._componentExist = false;
}
Do notice that if the Observable
stream does not emit any new value, the subscription will never be terminated, since the takeWhile
operator requires a stream to pass through to terminate itself.
takeUntil()
takeUntil
will accept any streamed values until the Observable
function in the takeUntil
emits a value. You might want to use this approach when the two approaches above do not suit your use case.
private ngDestroy$ = new Subject();
ngOnInit() {
interval(10000)
.pipe(takeUntil(this.ngDestroy$))
.subscribe((x) => {
...
});
}
ngOnDestroy() {
// Emit value to make all takeUntil gets triggered
this.ngDestroy$.next();
// Then complete its own subscription to make sure there is no loose end
this.ngDestroy$.complete();
}
3. Use Angular async pipe
Angular comes with an amazing async
pipe, which allows an observable
value to be streamed directly into the html template. The benefit of this approach is that you don’t have to manage anything. Angular will manage it for you, and it will follow the component's life cycle.
TS:
todos$: Observable<Todo[]>;
ngOnInit() {
this.todos$ = interval(10000).pipe(
...
);
}
HTML:
<ul>
<li *ngFor="let todo of (todos$ | async)">
{{ todo | json }}
</li>
</ul>
Summary
Make sure you terminate all RxJS subscriptions. You have seen that in a small-scale application, an unterminated subscription can cause some unwanted effects. Imagine what could happen in an enterprise app where there are so many subscriptions happening.
We've learnt that in Angular, you can terminate a subscription either by:
Unsubscribe all subscriptions manually
Completing Subscription using RxJS pipe:
a.take
operator
b.takeWhile
operator
c.takeUntil
operatorUse Angular
async
pipe and let Angular do it for you
Each of them has their strong points. You need to know when to use it according to your own needs.
Personally, I would recommend using async
when the data stream is only being used in the HTML template and use takeUntil
if the data is being used in the component. You definitely can mix the usage according to your circumstances.
Now we can all have a fun time with subscriptions!