How to connect streams with pipeline?
© https://nodejs.org/en/

How to connect streams with pipeline?

Learn what pipeline does, and how to connect streams using pipeline

ByMario Kandut

honey pot logo

Europe’s developer-focused job platform

Let companies apply to you

Developer-focused, salary and tech stack upfront.

Just one profile, no job applications!

This article is based on Node v16.14.0.

Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.

Streams in Node.js

💰 The Pragmatic Programmer: journey to mastery. 💰 One of the best books in software development, sold over 200,000 times.

This is the fourth article of a series about streams in Node.js. It explains what pipeline does in Node.js, and how to connect streams using pipeline.

Streams in Node.js

How to connect streams with pipeline

The pipeline is a module method to pipe between streams and generators. It forwards errors and cleans up. It also provides a callback when the pipeline is complete. The pipeline method was added to Node.js v.10 to improve the experience of piping streams.

It takes any number of streams as arguments, and a callback function as its last argument. If an error occurs anywhere in the pipeline, the pipeline will end, and the callback will be invoked with the error. Also, if the pipeline successfully ends, the callback function is invoked. Hence, we have a way to see when the pipeline has completed.

Let's look at a code example. First we are going to create a sample file, then we are going to create a pipeline, with readable, PassThrough and writable streams.

Create a file.

touch create-sample.js

Add code to create a sample file with lorem ipsum.

const fs = require('fs');

fs.writeFileSync(
  'input.txt',
  "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.",
  { encoding: 'utf8' },
);

Create a file.

touch streams-pipeline.js

Add sample code.

const { PassThrough, pipeline } = require('stream');
const fs = require('fs');

const input = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');

const passThrough = new PassThrough();

console.log('Starting pipeline...');
pipeline(input, passThrough, out, err => {
  if (err) {
    console.log('Pipeline failed with an error:', err);
  } else {
    console.log('Pipeline ended successfully');
  }
});

Run the code with node streams-pipeline.js from the terminal. The code will log Starting pipeline... when the pipeline starts and Pipeline ended successfully when the pipeline is done.

Now let's emit an error and see if the error handling is triggered. Add this line at the end of the code and run it again.

passThrough.emit('error', new Error('Oh no!'));

The code will log Starting pipeline... when the pipeline starts, and then the error gets emitted by passThrough and the pipeline will end with an error and log Pipeline failed with an error: Error: Oh no!.

One of the big benefits with pipeline is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up) This cleanup step prevents memory leaks, which can occur when a stream has ended, but has not released the memory it was using. When using the pipe method, you are responsible for destroying streams yourself when an error occurs.

Using pipeline simplifies error handling and stream cleanup. The method makes combining streams more readable and maintainable.

Transform stream with pipeline

Let's make a more powerful stream and create our own transform stream to alter data as it is streamed from the source to the destination.

Let's implement a simple transform with the pipeline method, which transforms all strings that pass through to upper case. For input and output we are going to use process.stdin and process.stdout.

Create a file.

touch transform-it.js

Copy code.

const { Transform, pipeline } = require('stream');

const upperCaseTransform = new Transform({
  transform: function(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

pipeline(process.stdin, upperCaseTransform, process.stdout, err => {
  if (err) {
    console.log('Pipeline encountered an error:', err);
  } else {
    console.log('Pipeline ended');
  }
});

Run the file with node transform-it.js and type your name in lower case. You will see that it gets transformed to upper case. You can exit the stream with ctrl+c.

What happened in the code? We created a Transform stream using the constructor from the stream module. We are required to implement a transform method on our transform stream. This transform function will receive a chunk of data that pass through the transform stream, the encoding of the chunk, and a callback function, which we can use to return the transformed data or an error. We are also converting the chunk data to a string, because by default the data chunk will be a Buffer.

Transform streams can be very powerful for creating pipelines to alter or process streaming data and are much more composable than a listening to stream events like .on('data') and then altering it.

TL;DR

  • Using pipeline simplifies error handling and stream cleanup.
  • The pipeline method makes combining streams more readable and maintainable.
  • One of the big benefits with pipeline is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up).

Thanks for reading and if you have any questions, use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode, Node.js - Streams, MDN - Streams

More node articles:

Getting started with Webpack

How to list/debug npm packages?

How to specify a Node.js version

How to create a web server in Node.js

How to dynamically load ESM in CJS

How to convert a CJS module to an ESM

How to create a CJS module

How to stream to an HTTP response

How to handle binary data in Node.js?

How to use streams to ETL data?

How to connect streams with pipeline?

How to handle stream errors?

How to connect streams with pipe?

What Is a Node.js Stream?

Handling Errors in Node (asynchronous)

Handling Errors in Node.js (synchronous)

Introduction to errors in Node.js

Callback to promise-based functions

ETL: Load Data to Destination with Node.js

ETL: Transform Data with Node.js

ETL: Extract Data with Node.js

Event Emitters in Node.js

How to set up SSL locally with Node.js?

How to use async/await in Node.js

What is an API proxy?

How to make an API request in Node.js?

How does the Event Loop work in Node.js

How to wait for multiple Promises?

How to organize Node.js code

Understanding Promises in Node.js

How does the Node.js module system work?

Set up and test a .env file in Node

How to Use Environment Variables in Node

How to clean up node modules?

Restart a Node.js app automatically

How to update a Node dependency - NPM?

What are NPM scripts?

How to uninstall npm packages?

How to install npm packages?

How to create a package.json file?

What Is the Node.js ETL Pipeline?

What is data brokering in Node.js?

How to read and write JSON Files with Node.js?

What is package-lock.json?

How to install Node.js locally with nvm?

How to update Node.js?

How to check unused npm packages?

What is the Node.js fs module?

What is Semantic versioning?

The Basics of Package.json explained

How to patch an NPM dependency

What is NPM audit?

Beginner`s guide to NPM

Getting started with Node.js

Scroll to top ↑