written for coders favicon image

RxJS Pipe the complete guide - Learn when and how to use RxJS Pipe and pipe operator

Header image for the RxJS pipe blog post

RxJS operators help you to easily compose complex asynchronous code that is based on Observables. Even though Observables are the foundation of the RxJS library, their operators are what make the library useful for developers and help them to handle complex observable patterns with ease. The RxJS pipe and pipable operators are at the heart of this and let you transform observables in a declarative and pure manner.

To get a better understanding of the RxJS pipe and pipeable operators let's first define some things:

  1. RxJS Observable: A push-based system to handle asynchronous code in JavaScript (or TypeScript) where you subscribe to an observable and the Observable pushes new values to all subscribers

  2. .pipe() method: The .pipe() method is chained on an Observable to apply pipeable operators and return a new observable

  3. Pipable operators: RxJS operators that are used to transform the value of the original observable and return a new observable with the modified values

  4. pipe() function: A function to compose new operators that perform a (commonly used) set of pipeable operators with one function call

Table of content:

RxJS pipe and pipeable operators explained

The RxJS pipe method (or function) is used in combination with RxJS pipeable operators to perform some logic on an observable stream and emit a new observable without altering the original observable. pipe() can be called on one or more functions and each pipe() function takes in one argument ("UnaryFunction") and uses this argument to return a new Observable value. This ("UnaryFunction") argument is basically just a comma-separated list of pipeable operators.

The pipeable operators you pass as an argument to the pipe function will be executed one by one. So it will first perform the logic of the first pipeable operator and then use that value to perform the logic of the next pipeable operator and so on.

RxJS pipeable operators are functions that take in an Observable as input, perform some logic, and return a new observable without modifying the previous (or original) Observable. Because pipeable operators are not subject to state and will always return the same value given the same input and because they do not alter the previous Observable they are considered a pure operation.

Subscribing to the .pipe() method means you also subscribe to the original Observable. All pipeable operators that create an observable will unsubscribe for the previous one automatically, so you will only have to unsubscribe from the original observable, not the pipeable operators.

Let's dive into some examples. In the below snippet, you can create an Observable stream with the RxJS creation operator 'of'. In our example, this Observable stream will emit 3 values one by one. When you subscribe to the observable and log the values you will see 3 logs, each containing the next value of our Observable stream:

Gist

In the above example, you just have a regular Observable stream where we subscribe and log the value of the stream, no RxJS pipe magic happening yet. Now let's say you want to multiply each number of your observable stream by 10. For this, you can use the pipe method with a pipeable operator (in this case the RxJS map operator). To achieve this just add .pipe() to the observable and subscribe to the pipe() instead of directly to the main observable. Inside the pipe method, you can add the RxJS map pipeable operator. The map operator works similarly to the .map method you know from arrays.

Gist

As you can see the RxJS pipe method allows you to perform some logic on the observable stream by receiving an Observable as input (the Observable we chained .pipe() onto) and returning a new Observable after the pipeable operators are performed on the observable value. Now let's demonstrate that the pipeable operators you pass to the pipe method are performed one by one.

Meaning that the pipe method takes in the original Observable, and passes it to the first pipeable operator in the form of a new observable, the first pipeable operator performs its logic on the observable values and passes it on to the next pipeable operator to perform its logic and so on. You subscribe to the pipe method yielding the final Observable stream where all pipeable operators are applied one by one in sequence.

To demonstrate this, let's add a second map operator to the pipe method and subtract 5 from each value:

Gist

You can see that the original numbers of the Observable stream are first multiplied by 10 and after that 5 got subtracted, resulting in the values we see in the log. If you now add yet another pipeable operator, filter for example this one will also be performed on the values after the others are performed:

Gist

What if we have a set of pipe operators we want to apply commonly one an Observable? In that scenario, you can save the set of pipeable operators in a function and call that inside of the pipe method. This is a great way to reuse common pipable operators and logic. Let's save the set of operators we just created into a new function and use that on our Observable stream.

Gist

All RxJS pipeable operators

Join Creation Operators

Transformation Operators

Filtering Operators

Join Operators

Multicasting Operators

Error Handling Operators

Utility Operators

Conditional and Boolean Operators

Mathematical and Aggregate Operators

Final words

RxJS pipe and pipeable operators give you a powerful tool at your disposal to perform logic on an Observable or Observable stream. It's easy to reuse commonly used pipeable operators and/or sets of them to keep your code base clean and DRY. I would advise diving a bit into all the RxJS operators listed above to make your code that much more powerful when handling complex observable patterns.

Make your friends more educated by sharing!