My Logo

PUBLISHED OCTOBER 08, 2024

Teghen Donald Ticha
Lastly Updated: 2 days ago
Reading time 11 mins

Introduction to RxJS and Observables: A Beginner’s Guide

Learn the basics of RxJS and observables in JavaScript. Understand how to handle async streams, build reactive apps, and master core concepts like subscriptions with easy examples.
Introduction to RxJS and Observables: A Beginner’s Guide

Prerequisite

Before starting with this guide, it's recommended that you have:

  1. 1.Basic Understanding of JavaScript:
    • Familiarity with JavaScript functions, variables, and basic control structures.
    • Understanding of JavaScript ES6+ features like arrow functions, const, let, etc.
  2. 2.Async Programming in JavaScript:
    • Knowledge of promises and async/await.
    • Some experience working with callbacks.
  3. 3.(Optional) Basic Notions of Functional Programming:
    • A loose understanding of concepts like pure functions, immutability, and function composition can be helpful since RxJS leverages some functional programming principles, though it’s not required to start.
What is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for `reactive programming` that makes working with asynchronous data streams seemless.

It leverages `observables`, a core feature that allows us to handle data that arrives over time, such as API responses, user inputs, or even system events.

Observables are flexible, allowing you to manage asynchronous data flows more efficiently than with traditional techniques like promises or callbacks.



Why RxJS?

The best way to answer this question is to review the traditional async patterns, discuss their limitations and then illustrate how rxjs deals with these limitations.


1. Callback Pattern

Callbacks have been the traditional way of handling async operations like API calls or file reading in JavaScript.

However, they come with a significant drawback popularly known as `callback hell`, where callbacks are nested within callbacks, leading to unmanageable, hard-to-read code.

Let's have an example:

function fetchData(callback) {
    setTimeout(() => {
      callback('Data from API');
    }, 1000);
}
  
function processData(data, callback) {
    setTimeout(() => {
        callback(data.toUpperCase());
    }, 1000);
}
  
function displayData(data) {
    console.log('Processed Data:', data);
}
  

fetchData((data) => {
    processData(data, (processedData) => {
        displayData(processedData);
    });
});

  //Outputs:
  // Processed Data: DATA FROM API
  

Breakdown: As you can see each async task (fetching data, processing data) depends on the result of the previous one.

As the number of operations grows, the nesting creates deeply indented, hard-to-read, and error-prone code. (The code above looks confusing even to me and I wrote it 🫣).



2. Promise Pattern

Then came Promises, which provide a cleaner, more manageable way of handling async tasks.

They flatten the structure of async operations and avoid the nesting problem of callbacks.

However, `promises are designed to handle a single async result`.

This becomes a limitation when you need to deal with ongoing streams of data, like handling multiple events or continuous API responses.

As an Example (Promises Handling a Single Event):

function fetchData() {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve('Data from API');
      }, 1000);
    });
  }
  
  function processData(data) {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(data.toUpperCase());
      }, 1000);
    });
  }
  
  fetchData()
    .then(data => processData(data))
    .then(processedData => console.log('Processed Data:', processedData))
    .catch(error => console.error('Error:', error));

Breakdown: Promises solve the nesting problem, but they handle only one value at a time.

Issue: If we needed to handle multiple asynchronous events (e.g., listening to user actions over time), promises become cumbersome and less flexible.


3. Generator Pattern

Generators were introduced in ES6 and they allow for lazy evaluation and can handle asynchronous tasks.

They offer an elegant way to `pause` and `resume` functions but still lack the flexibility for dealing with multiple async values or continuous streams.

Let's refactor our code yet agin

function fetchData() {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve('Data from API');
      }, 1000);
    });
  }
  
  function processData(data) {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(data.toUpperCase());
      }, 1000);
    });
  }
  
  function* dataFlow() {
    const data = yield fetchData(); // Pause until data is fetched
    const processedData = yield processData(data); // Pause until processed
    return processedData;
  }
  
  const iterator = dataFlow();
  iterator.next().value
    .then(data => iterator.next(data).value)
    .then(processedData => console.log('Processed Data:', processedData));
    

Breakdown: Generators allow for step-by-step execution, but still, they only handle single async values at a time.

To manage complex, ongoing event streams, they fall short without extra libraries to manage the flow.


RxJS to the rescue:

RxJS, through `observables`, handles asynchronous operations in a way that addresses the shortcomings of callbacks, promises, and generators.

Observables allow you to handle `multiple values over time`, and they integrate smoothly with JavaScript’s async nature, providing more flexibility with operators that transform and manage data streams.

Let's start with a simple synchronous example:

Make sure your install the rxjs package via npm install rxjs

const { Observable } = require('rxjs');

const dataObservable = new Observable(subscriber => {
  subscriber.next('First value'); //emit a first value
  subscriber.next('Second value'); //emit a second value
  setTimeout(() => {
    subscriber.next('Third value'); //emit a thrid value
    subscriber.complete(); //close the close (emitting a 'complete' event)
  }, 1000);
});


dataObservable.subscribe({
  next(value) { console.log('Received:', value); },
  complete() { console.log('All values received'); }
});

//Outputs:
// Received: First value
// Received: Second value
// Received: Third value

Breakdown: Unlike promises, observables can emit `multiple values` over time.

The subscribe() method allows you to observe the data stream, and the observable can also handle asynchronous emissions.

You can apply powerful operators to `filter`, `transform`, or `combine` streams of data (e.g., map, filter, mergeMap), offering a more robust solution for complex async operations.

Let's refactor our old async example to see how rxjs handles that:

const {from, mergeMap } = require('rxjs');

function fetchData() {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve('Data from API');
        }, 1000);
    });
}
  
function processData(data) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(data.toUpperCase());
        }, 1000);
    });
}

function displayData(data) {
    console.log('Processed Data:', data);
}

const processData$ = (str) => from(processData(str));
const fetchData$ = from(fetchData());

fetchData$
.pipe(
    mergeMap(res => processData$(res)),
)
.subscribe({
    next : displayData,
    complete() { console.log('All values received'); }
})

// Outputs:
// Processed Data: DATA FROM API
// All values received

Breakdown: These two line simply converts the async processes in to stream data prducers:

constprocessData$ = (str) => from(processData(str));

constfetchData$ = from(fetchData());

Meaning that, if we decided to update these data sources such that they produce multiple values, then, we wouldn't change a thing in the subsequent parts of the code.

For example, say we wanted to consume the data source every second, but we are only interested in the first 5 results.

const { from, mergeMap, interval, take } = require('rxjs');

function fetchData() {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve('Data from API');
        }, 1000);
    });
}

function processData(data) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(data.toUpperCase());
        }, 1000);
    });
}

function displayData(data) {
    console.log('Processed Data:', data);
}


const processData$ = (str) => from(processData(str));

const fetchData$ = interval(1500).pipe(
    take(5), // Simulate fetching data 5 times
    mergeMap(() => from(fetchData())) 
);

fetchData$
.pipe(
    mergeMap(res => processData$(res)), 
)
.subscribe({
    next: displayData,
    complete() { console.log('All values received and processed'); }
});

Breakdown:

  • interval(1500): This emits a new value every 1.5 seconds. We use it to simulate a continuous stream of async data fetching over time.
  • take(5): This limits the stream to 5 emissions (i.e., we will fetch data 5 times).
  • mergeMap(): This operator takes each emitted value and maps it to a new observable (in this case, it starts with the fetch operation, and once fetched, it processes the data before emitting the next value).
  • from(fetchData()): Converts the promise-based function into an observable. Each time the interval emits a value, it triggers fetchData() to run, simulating a fresh fetch of data.
  • processData$(res): After fetching the data, we pass the result into the processData$ observable to process the data before displaying it.
  • Sequential and Continuous Handling: The combination of mergeMap() and the use of observables makes the data flow sequentially and continuously. Each time fetchData() completes, it moves on to processing the result before the next value is emitted by interval.

Yeah! If you new to rxjs, this looks crazy🤓!

To paint the full picture, try refactoring the above using promises or callback, then you get the idea.


As you see, this pattern allows you to handle multiple async operations sequentially while processing each result as it becomes available, without worrying about nesting callbacks or managing promise chains manually.

You can easily expand this to handle ongoing streams, cancellations, or more complex workflows with minimal changes to the code.


Now before we get ahead of ourselves, let's go back to the fundamental idea behind rxjs in the next section.

To understand how rxjs works, one must understand the various core concepts that make up the ReactiveX.

`ReactiveX` combines the `Observer pattern` with the `Iterator pattern` and `functional programming` with collections to fill the need for an ideal way of managing sequences of events.


1. The Observer Pattern

The Observer pattern is a design pattern where an object (the subject) maintains a list of dependents (observers) and notifies them automatically of any state changes.

In RxJS, observables follow this pattern by "pushing" data to subscribers (observers) when new data becomes available.

Let's look at a trivial implementation of this patterns.

class Subject {
    constructor () {
        this.observers = [];
    }

    subscribe(observer) {        
        this.observers.push(observer);
        return {
            unsubscribe : () => {
                this._unsubscribe(observer)
            }
        }
    }

    notify (data) {
        this.observers.forEach(obs => {
            obs.update(data);
        });
    }

    _unsubscribe (observer) {
        const index = this.observers.indexOf(observer);
        if (index !== -1) {
            this.observers.splice(index, 1);
        }
    }    
}

class Observer {
    constructor (name) {
        this.name = name;
    }

    update (data) {
        console.log(`new data for ${this.name}: `, data);
    }
}

const subject = new Subject();
const observerA = new Observer('Observer A');
const observerB = new Observer('Observer B');
const observerC = new Observer('Observer C');

const subA = subject.subscribe(observerA);
const subB = subject.subscribe(observerB);

// notify the subscribers with a new value
subject.notify('Hi there!'); 

const id1 = setTimeout(() => {
    //unsubscibe both A and B
    subA.unsubscribe();
    subB.unsubscribe();
    clearTimeout(id1)
}, 1500);

const id2 = setTimeout(() => {
    const subC = subject.subscribe(observerC); 
    subject.notify('Hi Again!'); 
    subC.unsubscribe(); 
    subject.notify('Anyone there?'); 
    clearTimeout(id2);
}, 2000);

//Outputs
    // new data for Observer A:  Hi there!
    // new data for Observer B:  Hi there!
    // new data for Observer C:  Hi Again!

Simple and straight-forward right? However, in RxJS, observables take care of this notification mechanism for us ie subject.notify is called automatically when new data in available in the stream.



2. The Iterator Pattern

The Iterator pattern allows sequential access to the elements of a collection without exposing its internal structure.

RxJS integrates this pattern by letting observables emit values over time, similar to how an iterator provides values one by one.

Below is a simple example of an interator.

function createIterator(items) {
    let index = 0;
    return {
      next: function() {
        return index < items.length
          ? { value: items[index++], done: false }
          : { done: true };
      }
    };
  }
  
  const iterator = createIterator([1, 2, 3]);
  
  console.log(iterator.next()); // { value: 1, done: false }
  console.log(iterator.next()); // { value: 2, done: false }
  console.log(iterator.next()); // { value: 3, done: false }
  console.log(iterator.next()); // { done: true }
  console.log(iterator.next()); // { done: true } 
// NB: all subsequent next() calls will produce the same result {done: true}

// Outputs:
    //{ value: 1, done: false }
    //{ value: 2, done: false }
    //{ value: 3, done: false }
    //{ done: true }
    //{ done: true }

We can make the above iterable by simply exposing [Symbol.iterator]() method like so:

function createIterator(items) {
    let index = 0;
    return {
      next: function() {
        return index < items.length
          ? { value: items[index++], done: false }
          : { done: true };
      },
      [Symbol.iterator]: function () {return this;}
    };
  }
  
  const iterator = createIterator([1, 2, 3]);
  
  console.log(iterator.next()); // { value: 1, done: false }
  console.log(iterator.next()); // { value: 2, done: false }
  console.log(iterator.next()); // { value: 3, done: false }
  console.log(iterator.next()); // { done: true }


const iterator2 = createIterator([1, 2, 3, 4, 5, 6]);
for(let v of iterator2) { // stops immediately done is true 
    console.log(v); // 1,2,3,4,5,6 
}

NB: In RxJS, observables act as an asynchronous iterator, emitting values over time.



3. Functional Programming with Collections

Functional programming emphasizes using `pure functions`functions that have no side effects and return the same output for the same input.

In RxJS, operators like map, filter, and reduce apply transformations to the data stream in a `declarative`, `side-effect-free` manner.

Let’s apply the notion of pure functions to transform a collection of values.

const objectValues = [{v:1}, {v:2}, {v:3}, {v:4}, {v:5}, {v:6}, {v:7}, {v:8}, {v:9}, {v:10}]
const values = objectValues.map(o => o.v); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

const oddsValues = values.filter(x => x % 2 !== 0); 
const evenValues = values.filter(x => x % 2 === 0); 

console.log('Odds values: ', oddsValues); // Odds values:  [ 1, 3, 5, 7, 9 ]
console.log('Even values: ', evenValues); // Even values:  [ 2, 4, 6, 8, 10 ]

In RxJS, these functional programming principles are applied to observable streams using operators like map and filter.



Combining Observer, Iterator, and Functional Programming in RxJS

Now that we’ve broken down each pattern individually, let’s illustrate how RxJS combines all three into one cohesive system.

Here’s a simple example where RxJS uses:

  • Observer pattern: Subscribing to and receiving values from an observable.
  • Iterator pattern: Emitting values sequentially over time.
  • Functional programming: Transforming data using pure functions (map and filter).
const { Observable } = require('rxjs');
const { map, filter } = require('rxjs/operators');

// Create an observable that emits values over time (Iterator pattern)
const observable = new Observable(subscriber => {
    let value = 1; // start with 1
    const intervalId = setInterval(() => {
        subscriber.next(value++);  // Emit values 1, 2, 3...
        if (value > 10) {
            clearInterval(intervalId); // Complete after 10 values
            subscriber.complete();
        }
    }, 1000);
});

// Apply functional operators (Functional programming)
const processedObservable = observable.pipe(
    map(value => value * 10),       // Multiply each value by 10
    filter(value => value > 50 )    // Only allow values greater than 50
);

// Subscribe to the observable (Observer pattern)
processedObservable.subscribe({
    next: value => console.log(`Received: ${value}`),
    complete: () => console.log('All values processed'),
});

// Output
  // Received: 60
  //Received: 70
  //Received: 80
  //Received: 90
  //Received: 100
  //All values processed

Breakdown:

  • The Observer pattern is demonstrated by the .subscribe() method, where an observer listens to the stream.
  • The Iterator pattern is illustrated by emitting values (1 to 10) one by one at intervals of 1 second.
  • The Functional programming part is handled by the map and filter operators, which transform the data stream in a pure, declarative way.

NB: If you paid attention while running the above, you will notice that the value start appearing in the terminal after 5 seconds? Can you guess why?

Here is where the magic happens: filter(value => value > 50 )

The first 5 values are skipped and each value comes with a 1 second delay.

As you can see these combination makes RxJS highly flexible and expressive, ideal for working with dynamic, event-driven applications.


In the next section we'll look at some key RxJS concepts with actually everyday usecase examples.

Observables are designed to emit data over time, handling both synchronous and asynchronous events.

They can emit multiple values and are inherently `lazy`, meaning they only begin emitting values once there is a subscription.

The above point very crucial when writing RxJS code. We'll explore what that means later.

Observable Data Flow:
  1. 1.Emit data (synchronously or asynchronously).
  2. 2.Subscribe to an observable to start receiving emitted values.
  3. 3.React to data or perform actions using observers (callback functions like next, error, and complete).

Subscriptions and Unsubscribing

Subscriptions are how observers receive values from observables.

It's important to unsubscribe from observables when they are no longer needed, especially when working with long-running observables (like those tied to DOM events) to prevent memory leaks.


While observables emit data to subscribers, not all observables behave the same. Observables can be categorized as either cold or hot:

  • Cold observables: Only produce values when there is an active subscription (e.g., from or of).
    Cold observables (like HTTP requests) wait until someone subscribes, ensuring each subscriber gets a fresh execution
  • Hot observables: Produce values continuously, regardless of whether there are subscribers (e.g., Subject).
    Hot observables (like event streams or mouse clicks) share the same execution across all subscribers. Each subscriber listens to the same data stream.


Although this topic will be expanded in a dedicated article, it’s important to note this distinction when working with streams of data.


An example of a cold observable:

const { of } = require('rxjs');

// Cold observable (each subscription gets independent data)
const cold$ = of('Cold data');

cold$.subscribe(value => console.log('Subscriber 1:', value));  // Output: Subscriber 1: Cold data
cold$.subscribe(value => console.log('Subscriber 2:', value));  // Output: Subscriber 2: Cold data



An example of a hot observable (with unsubscription).

const { interval } = require('rxjs');
const { share } = require('rxjs/operators');

// Hot observable (all subscribers share the same execution)
const hot$ = interval(1000).pipe(share());  // Share the same data stream

let sub2, sub3 // subscriptions
const sub1 = hot$.subscribe(value => console.log('Subscriber One:', value));  // Output: 0, 1, 2, ...

const id1 = setTimeout(() => {
  sub2 = hot$.subscribe(value => console.log('Subscriber Two:', value));  // Output: 2, 3, 4, ...
}, 2500);  // Subscriber 2 joins late and misses the first few values

const id2 = setTimeout(() => {
  sub3 = hot$.subscribe(value => console.log('Subscriber Three:', value));  // Output: 4, 6, 7, ...
}, 5000);

// unsubscribe all observers and clear timers
const id3 = setTimeout(() => {
  [sub1, sub2, sub3].forEach(sub => sub.unsubscribe());
  [id1, id2, id3].forEach(id => clearTimeout(id));
}, 10000)

In this example `share() operator` actually convert the cold observable stream from interval factory operator to a hot one, any new subscriber will start receiving the latest values.

Perhabs some more concrete examples.

Let’s create an observable that tracks user input in a search field, debounces the input, and updates the DOM with the filtered results.

In a run.html file add the code below:

<!DOCTYPE html> 
<html lang="en-us">
    <head>
        <meta charset="utf-8">
        <script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
        <title>Play Book</title> 
        <style type="text/css">
            html, body {
                margin: 0;
                padding: 0;
                height: 100%;
            }
           
        </style>
    </head>
    <body>       
        <input type="text" id="search-box" placeholder="Type something..." />
        <ul id="results"></ul> 
        <script src="./run.js"></script>
    </body>
</html>

No explanation required there.

Just one thing to note here. Adding a source script as we did above exposing a `global rxjs` object, which we then use in our `run.js` file.


Next up, in a run.js file add the following:

const { fromEvent } = rxjs;
const { map, debounceTime, distinctUntilChanged } = rxjs.operators;


const searchBox = document.getElementById('search-box');
const results = document.getElementById('results');


const search$ = fromEvent(searchBox, 'keyup').pipe(
  map(event => event.target.value),        
  debounceTime(300),                       // Add a 300ms delay between keystrokes
  distinctUntilChanged()                   // Ignore if the next value is the same as the previous
);


search$.subscribe({
  next: (query) => {
    results.innerHTML = '';
    if (query) {      
      const items = ['apple', 'banana', 'cherry', 'date', 'fig', 'grape'];
      const filteredItems = items.filter(item => item.includes(query));
      filteredItems.forEach(item => {
        const li = document.createElement('li');
        li.textContent = item;
        results.appendChild(li);
      });
    }
  }
});

Breakdown: fromEvent creates an observable from DOM events.

debounceTime is used to reduce the number of times we act on the input by delaying the event stream.

distinctUntilChanged ensures that only unique queries trigger further actions.

Pretty awesome right 🤓 ?

Not convinced yet? alright let's have another example that uses a `combinatory operator`.

NB: A combinatory operator is one that flatterns the results of higher-order observables into a single observable.

Let’s update our search example to include data fetching from an API and filtering results before displaying them:

Replace the body of your html with the following:

<input type="text" id="search-box" placeholder="Type to search countries..." />
<ul id="results"></ul>

Then update the run.js file with following:

const { fromEvent, of } = rxjs;
const { map, debounceTime, distinctUntilChanged, switchMap } = rxjs.operators ;

// Fake API response the returns an observable
function fetchCountries(query) {
  const countries = ['Argentina', 'Australia', 'Belgium', 'Brazil', 'Canada', 'Denmark'];
  return of(countries.filter(country => country.toLowerCase().includes(query.toLowerCase())));
}


const searchBox = document.getElementById('search-box');
const results = document.getElementById('results');


const search$ = fromEvent(searchBox, 'keyup').pipe(
  map(event => event.target.value),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => fetchCountries(query))  // Switch to new observable on every new search
);


search$.subscribe({
  next: (countries) => {
    results.innerHTML = '';  
    countries.forEach(country => {
      const li = document.createElement('li');
      li.textContent = country;
      results.appendChild(li);
    });
  }
});

Basically here: switchMap is used here to cancel previous requests and only keep the latest one (i.e., the user’s latest query). ie on each keyup event, send the corresponding fetch query result instead.


This approach makes it easier to manage complex async workflows, as RxJS handles everything behind the scenes, including the switching between different observables.




Conclusion

RxJS is a powerful library that merges three essential design patterns: Observer, Iterator, and Functional Programming with collections.

These patterns enable developers to handle asynchronous data streams efficiently and declaratively in modern JavaScript applications.

  • Observer Pattern: Provides a mechanism to react to emitted values over time.
  • Iterator Pattern: Supplies an easy way to sequentially access elements from a collection (stream).
  • Functional Programming: Operators like map, filter, and mergeMap make it possible to transform and manipulate data streams using pure functions.

By understanding these foundational patterns, you can solve complex async problems, such as managing UI interactions or network requests, with elegant, declarative code.

RxJS stands out as an essential tool for building robust, reactive applications.

In future articles, we will dive deeper into advanced concepts like `hot vs cold observables`, how to combine multiple streams, error handling in RxJS, and more.

Stay tuned for more on reactive programming with RxJS!

Interested in exploring more 🤓? Check out these Related Posts.