RxJs operators in angular

·

6 min read

👉 Observable

An Observable is a function that returns a stream of values. An observable is cold by default and it will become hot when the first subscriber arrives, meaning that it will not emit values until the first subscription is made.

import {Observable} from 'rxjs';

ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(countOnetoTen);

        sqnc.subscribe({
            next(num) { console.log(num); }
        });

        // This function runs when subscribe()
        // is called
        function countOnetoTen(observer) {
            for(var i = 1; i <= 10; i++) {

                // Calls the next observable
                // notification
                observer.next(i);
            }         
            // Unsubscribe after completing
            // the sequence
            return {unsubscribe(){}};
        }
}

👉 Subscription

A subscription represents the execution of an Observable. As said before, it turns the Observable hot by calling the method “.subscribe”. Then the callback function inside this method will be executed for every emitted value.

import { of } from 'rxjs';

//emits any number of provided values in sequence
const observable = of(1, 2, 3, 4, 5, 6);

//output: 1,2,3,4,5,6
const subscription = observable.subscribe(val => console.log(val));

👉 Pipe

Since RxJS , the pipe operator becomes the right way to use operators. You can consider it as a function that takes one Observable as an input, lump together all the transformations with different operators, and returns another Observable. As you can see in the following example, it improves the readability of the code and is better for tree-shaking.

observable.operator1().operator2().operator3().subscribe(...);

// Since RxJS 5.5
observable.pipe(
  operator1(),
  operator2(),
  operator3()
).subscribe(...);

👉 Map

map() Operator is most commonly used operator while using Rxjs. it is used to to transform the each value emitted by the source observable. simply it creates new observable after manipulating the each items of source observable.

We use map() operator in pipe function where we can chain multiple operator together to get the desired output from an observable. Lets see how we can apply projection with each value from source observable.


import { from } from 'rxjs';
import { map } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//add 10 to each value
const example = source.pipe(map(val => val + 10));
//output: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

👉 DebounceTime

While working on a web page if we want to control user input or typing. There we have debounceTime() is useful. With the help of debounceTime() we can create delay for given time span and without another source emission.

The best part of debounceTime() is that within the given time span if a new value arrives then the previous pending value gets dropped so we only get the latest value emitted by the source observable.


import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

// elem ref
const searchBox = document.getElementById('search');

// streams
const keyup$ = fromEvent(searchBox, 'keyup');

// wait .5s between keyups to emit current value
keyup$
  .pipe(
    map((i: any) => i.currentTarget.value),
    debounceTime(500)
  )
  .subscribe(console.log);

👉 DistinctUnitChanged

In scenario where we avoid unnecessary API hits from user inputs. Where we only want distinct values based on last emitted value to hit the API.

Then distinctUntilChanged operators comes in the game and handle such scenarios so that application can avoid unnecessary API hit.

With this operator we can use debounceTime() to give it some time to get the distinct value for search bar kind's of functionalities. debounceTime() and distinctUnitChanged() makes search bar more productive. These both operators return an observable.


import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

// only output distinct values, based on the last emitted value
const source$ = from([1, 1, 2, 2, 3, 3]);

source$
  .pipe(distinctUntilChanged())
  // output: 1,2,3
  .subscribe(console.log);

👉 ThrottleTime

Throttle time is used to ignore subscription for given time like when we want to hit an API but you want a delay but want to get the latest value emitted from source observable then,

ThrottleTime can be used. it seems similar to debounceTime where it is also make delay for the subscriptions but debounceTime() is only emits the last value emitted in source observable and throttleTime() picks the latest value emitted in the source observable.


import { interval } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

// emit value every 1 second
const source = interval(1000);
/*
  emit the first value, then ignore for 5 seconds. repeat...
*/
const example = source.pipe(throttleTime(5000));
// output: 0...6...12
const subscribe = example.subscribe(val => console.log(val));

👉 ForkJoin

We can use multiple observable execution with forkJoin() operator but it waits for all observables to be completed then it only emits the last value from each observable Remember if any of the observable does not complete then the forkJoin() will never complete.

By using frokJoin you can multiple API simultaneously.


import { ajax } from 'rxjs/ajax';
import { forkJoin } from 'rxjs';

/*
  when all observables complete, provide the last
  emitted value from each as dictionary
*/
forkJoin(
  // as of RxJS 6.5+ we can use a dictionary of sources
  {
    google: ajax.getJSON('https://api.github.com/users/google'),
    microsoft: ajax.getJSON('https://api.github.com/users/microsoft'),
    users: ajax.getJSON('https://api.github.com/users')
  }
)
  // { google: object, microsoft: object, users: array }
  .subscribe(console.log);

👉 Pluck

Pluck operator is used to pluck a property or value and then return in subscription. pluck is useful when you don't want unnecessary in the stream and you can pluck them in the on going execution. pluck will return undefined if it does not find the given property of value in it.

We can pluck multiple property like if we use an observable of array type then we can select multiple properties to be plucked.

// RxJS v6+
import { from } from 'rxjs';
import { pluck } from 'rxjs/operators';

const source = from([
  { name: 'Joe', age: 30, job: { title: 'Developer', language: 'JavaScript' } },
  //will return undefined when no job is found
  { name: 'Sarah', age: 35 }
]);
//grab title property under job
const example = source.pipe(pluck('job', 'title'));
//output: "Developer" , undefined
const subscribe = example.subscribe(val => console.log(val));

👉 CatchError

catchError is used to handle errors when we have sequence of source observables. this operator take cares to return error or new observable in case of catching errors on the source observable.

If is a pipeable operator so it can be used in pipe function in an observable. this function takes an argument then return an observable instance.

import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
    map(n => {
         if (n === 4) {
           throw 'four!';
      }
     return n;
    }),
    catchError(err => of('I', 'II', 'III', 'IV', 'V')),
  )
  .subscribe(x => console.log(x));
  // 1, 2, 3, I, II, III, IV, V

👉 MergeMap

Merge map is often get used when the requirement is to merge response from two observables. This operator return an observable after merging the response from two observables, things to notice here is that second observable does not execute until merged observable emits some response.

This is very popular operator of Rxjs we usually get to merge two responses of observable type while working with web pages in Angular or JavaScript.

import { of, interval } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';

const letters = of('a', 'b', 'c');
const result = letters.pipe(
  mergeMap(x => interval(1000).pipe(map(i => x+i))),
);
result.subscribe(x => console.log(x));

// Results in the following:
// a0
// b0
// c0
// a1
// b1
// c1
// continues to list a,b,c with respective ascending integers
Â