RxJs Operators

In RxJS, Operators are functions that accept an Observable as input, run some transformations on it, and return the new transformed Observable as output. These Operators are (mostly) pure, side effect free functions; meaning, they don’t change the incoming Observable in any way. This makes the Operators chain-able or pipe-able; allowing asynchronous logic in your application to be broken down into small manageable blocks.

 

RxJs categorizes its Operators in a few categories but the most commonly used Operators are Creation Operators and Transformation Operators. In this guide, we will explore how to create an Observable, how to transform, and how to consume the data emitted by Observables.

 

Creation Operator: of

This is the simplest way to create an Observable from static data. It is an easy-to-use wrapper that accepts any sequence of data as input and returns a ready to use Observable. This operator comes in handy to start a brand new Observable pipeline.

 

Javascript




of(10, 20, 30).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


 
 

Output:

 

'next: 10'
'next: 20'
'next: 30'
the end

Creation Operator: from

This Operator is similar to `of` but it works for iterable data i.e. it accepts a collection of data and returns an Observable that emits each value of the collection one at a time. The real power of this Operator comes from the fact that it can also accept asynchronous iterables like generators, promises, and other Observables.

 

Javascript




from([10, 20, 30]).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
  
console.log('----------')
  
const promise = fetchDataFromServer();
from(promise).subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


 
 

Output:

 

'next: 10'
'next: 20'
'next: 30'
the end
----------
'next: {msg: "Hello world!"}'
the end

Transformation Operator: map

This operator is very similar to Array#map. It accepts each new value emitted by the observable, transforms it, and passes it along to the next Operator in the pipe. This is where the conceptual underpinning of the Streams and Observables starts to shine.

 

Why go through the trouble of learning this whole new concept when the same problem can be solved using Array#map? Observables come in real handy when we simply can’t load the whole dataset into an Array (i.e. the data is effectively infinite). Or when we don’t have the whole dataset available to us upfront. As in, the dataset is asynchronous and new values are coming in slowly over the network. Or many times we have both problems meaning, effectively infinite data is coming in slowly over the network a few values at a time. 

 

Javascript




// Another RxJS creation operator that
// starts at 0 and emits 1000 values
range(1, 1000) 
.pipe(map(x => x * 10))
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);


Output:

'next: 10'
'next: 20'
'next: 30'
....
....
....
'next: 10000'
the end

RxJs operators are almost always Pure/side-effect free, and they work with one emitted value at a time. This makes dealing with effectively infinite datasets really easy. Since the function is side effect free, the system doesn’t have to hold on to items that are not currently being processed. i.e. only one item at a time is held in memory.

Transformation Operator: mergeMap

This operator is very similar to map but its transformation function returns asynchronous data (Observables or Promises). This makes handling many async calls to servers or databases very easy and even allows us to parallelize those calls.

Javascript




range(1, 1000).pipe(mergeMap(pageNum => 
    fetchBulkDataFromServer({pageNum: pageNum})))
.pipe(map(bulkData=>`Page Num ${bulkData.page} 
    returned ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);


Output:

'next: Page Num 1 returned 100 items'
'next: Page Num 2 returned 90 items'
'next: Page Num 3 returned 70 items'
'next: Page Num 4 returned 100 items'
....
....
'next: Page Num 1000 returned 30 items'
the end

As mergeMap is mapping over async data (Observables), it significantly speeds things up by mapping over multiple Observables in parallel. It accepts a second argument `concurrency count` that defines how many Observables to run in parallel. Implementing this level of parallel async processing without using Observables is not a straightforward task and can easily result in hard to debug concurrency bugs.

Javascript




const maxParallelApiCalls = 50;
range(1, 1000).pipe(mergeMap(pageNum =>
    fetchBulkDataFromServer({pageNum: pageNum}),
maxParallelApiCalls)).pipe(map(bulkData =>
    `Page Num ${bulkData.page} returned 
    ${bulkData.items.length} items`))
.subscribe(
    next => console.log('next:', next),
    err => console.log('error:', err),
    () => console.log('the end'),
);


Output:

'next: Page Num 7 returned 10 items'
'next: Page Num 12 returned 8 items'
'next: Page Num 38 returned 12 items'
'next: Page Num 3 returned 70 items'
....
....
'next: Page Num 1000 returned 30 items'
the end

In the above example, RxJs starts processing 50 observables at the same time and emit the data returned by those Observables in the order they finished. So whichever API call returns first its data would be piped to the next Operator. Here’s a timeline visualization of how async data is parallelized by mergeMap.

<Start> β€” Value 1 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”>
<Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 2 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”>
<Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 3 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”>
<Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Value 4 β€”β€”β€”β€”|β€”>
<Start> β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”Value 5 β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”|β€”>
β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€” Merge β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
<Start> β€” Value 1 β€”β€” Value 3 β€”β€” Value 5 β€”β€” Value 2 β€”β€” Value 4 β€”|β€”>

Conclusion: The above examples cover a few Operators in this beginners guide, but RxJS has many more, suitable for a wide variety of use cases. Check out their documentation to explore more.

Reference: https://rxjs.dev/api



RxJs – Beginner’s Guide

Have you ever wondered how you can open a 10GB file on a device with only 4GB of RAM? Or how Netflix app can play a cloud-hosted 4K movie on your phone as soon as you press play? On a device, with 4GB of RAM, a 10GB file is effectively infinite. How does your device manage to load something that is infinite?

It has to load the file into memory in small chunks, read them, and discard them before loading more data into memory. It has to stream the data and process it in small chunks.

Similar Reads

What is a Stream?

The stream is a collection of data that is potentially infinite. It is a sequence of data coming in overtime. It can be thought of as items on a conveyor belt being processed one at a time....

Observables

...

RxJs Operators

Observables are potentially infinitely collections that return values one at a time asynchronously. i.e. there can potentially be some time passed between one value returned and the next....

Contact Us