Manage RxJS Subscriptions in Angular

RxJS Aug 12, 2020

RxJS is baked in into Angular's environment and is heavily used behind-the-scenes inside Angular. You will notice that when you create your brand new Angular app with ng new newApp or using Visual Studio’s Angular template, RxJS is always included as one of the core dependencies.

But first, what is RxJS?

RxJS is a library for composing asynchronous and event-based programs by using observable sequences - RxJS

Since RxJS is baked into Angular we are so used to the fact we need to subscribe in order for us to receive data from our various services, e.g. HttpClient, NgRx Store or event.

What is Subscription?

To understand what is a Subscription, we need to first understand several terminologies in RxJS.

Observable

According to RxJS docs, Observable is a representation of any set of values over any amount of time.
Observable has subscribe() method, which invokes execution of an Observable and registers Observer handlers for notifications it will emit.

Subscription

Now that we understand Observable and subscribe() method, now we are ready to talk about Subscriptions. In a nutshell, a Subscription:

  • Is a disposable resource, usually the execution of an Observable (allowing value to be streamed out of observable)
  • Has one important method, unsubscribe(), which disposes of the resource held by the Subscription

Sample use case

You have a Web API available that provides endpoints for adding, editing, and retrieving “Todo” Items. Now you want to display these “Todo” items in an application.

In order to get all the data, we have to:

  1. Send an http request
  2. Subscribe to the Observable using subscribe() with a fat arrow function that will eventually receive the todo item payload
  3. Inside our fat arrow function, we can assign the incoming todo item to a local variable (which will/can trigger a UI update)
this.httpClient
    .get('https://jsonplaceholder.typicode.com/todos')
    .subscribe((todos) => {
      ...
    });

It would be nice always to have the latest data. We can continually fetch the new data by polling the endpoint every 10 seconds.

interval(10000).subscribe(() => {
      this.httpClient
        .get('https://jsonplaceholder.typicode.com/todos')
        .subscribe((todos) => {
          ...
        });
      });

However, little did we know that our code above was doing more than we intended. We somehow ended up bombarding the server with request after a few navigations back and forth. It went from fetching once every 10 seconds to once every a few seconds, and sometimes double fetching occurred.

Uncontrolled data fetching

Up to this point, we now realised that even if the component gets destroyed, the subscription will remain until we told them otherwise.

Solutions

1. Unsubscribe all Subscriptions

I think this sounds like the most sensible way when we think about the subscription.
Of course, all subscription needs to be unsubscribed for it to be stopped.

Unsubscribe single subscription

ngOnInit() {
  // Assign the subscription to a variable
  this.subscription = interval(10000).subscribe(() => {
    ...
  });
}

ngOnDestroy() {
  // Manually unsubscribe subscription on ngDestroy life-cycle
  this.subscription.unsubscribe();
}

Unsubscribe multiple subscriptions in an array

ngOnInit() {
  // Push every subscriptions to an array of subscriptions
  this.subscriptions.push(
    interval(10000).subscribe(() => {
      ...
    })
  );
}
  
ngOnDestroy() {
  // Loop on every subscriptions and unsubscribe
  this.subscriptions.forEach((subscription) => {
    subscription.unsubscribe();
  });
}

2. Completing Subscription using RxJS pipe

Before we head in to RxJS pipe, let us first understand what a pipe is. It is not the same as pipe with the symbol of |. Pipe in RxJS is a chain of Pipeable Operator, which is a pure function that takes an Observable as an input and emit a new Observable as the output (read more).

There is various way to complete RxJS through pipe. I'll share my top three operators.

take()

take operator lives up to its name. It will only take a specific number of value passed through and then unsubscribe itself.

ngOnInit() {
  interval(10000)
    // Take 10 values then unsubscribe the stream
    .pipe(take(10))
    .subscribe((x) => {
      ...
    });
}

takeWhile()

takeWhile operator will unsubscribe the Observable stream when the function call inside takeWhile returns false.

private _componentExist = true;
  
ngOnInit() {
  interval(10000)
    // takeWhile pipe will accept stream as long as the value returns true.
    // When the value is false, it will terminate the stream
    .pipe(takeWhile(() => this._componentExist))
    .subscribe(() => {
      ...
    });
}

ngOnDestroy() {
  this._componentExist = false;
}

Do notice that if the Observable stream does not emit any new value, the subscription will never be terminated, since the takeWhile operator requires a stream to pass through to terminate itself.

takeUntil()

takeUntil will accept any streamed values until an Observable function in the takeUntil emits a value. You might want to use this approach when the two approaches above do not suit your use-case.

private ngDestroy$ = new Subject();

ngOnInit() {
  interval(10000)
    .pipe(takeUntil(this.ngDestroy$))
    .subscribe((x) => {
      ...
    });
}

ngOnDestroy() {
  // Emit value to make all takeUntil gets triggered
  this.ngDestroy$.next();
  // Then complete its own subscription to make sure there is no loose end
  this.ngDestroy$.complete();
}

3. Use Angular async pipe

Angular comes with an amazing async pipe, which allows an observable value to be streamed directly into the html template. The benefit of this approach is that you don’t have to manage anything. Angular will manage it for you, and it will follow the component's life-cycle.

TS

todos$: Observable<Todo[]>;

ngOnInit() {
  this.todos$ = interval(10000).pipe(
    ...
  );
}

HTML:

<ul>
  <li *ngFor="let todo of (todos$ | async)">
    {{ todo | json }}
  </li> 
</ul>

Summary

Make sure you terminate all RxJS subscriptions. You have seen that in a small scale application, an unterminated subscription can cause some unwanted effect. Imagine what could happen in an enterprise app where there are so many subscriptions happening.

We've learnt that in Angular, you can terminate subscription either by:

  1. Unsubscribe all subscriptions manually
  2. Completing Subscription using RxJS pipe:
    a. take operator
    b. takeWhile operator
    c. takeUntil operator
  3. Use Angular async pipe and let Angular do it for you

Each of them has their strong points. You need to know when to use it according to your own need.

Personally, I would recommend using async when the data stream is only being used in the HTML template and use takeUntil if the data is being used in the component. You definitely can mix the usage according to your circumstance.

Now we can all have a fun time with subscriptions!

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.