How to connect streams with pipe?
© https://nodejs.org/en/

How to connect streams with pipe?

Learn what pipe does, and how to connect streams using pipe

ByMario Kandut

honey pot logo

Europe’s developer-focused job platform

Let companies apply to you

Developer-focused, salary and tech stack upfront.

Just one profile, no job applications!

This article is based on Node v16.14.0.

Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.

Streams in Node.js

💰 The Pragmatic Programmer: journey to mastery. 💰 One of the best books in software development, sold over 200,000 times.

This is the second article of a series about streams in Node.js. It explains what pipe does in Node.js, and how to connect streams using pipe.

Streams in Node.js

Connect streams with pipe

The recommended way to consume streams is the pipe and pipeline methods, which consume streams and handle the underlying events for you. To connect streams together and start the flow of data, the pipe method is available on readable streams. It is also possible to listen to stream events, but it is not recommended for consuming data. The main goal of pipe is to limit the buffering of data so that sources and destinations will not overwhelm the available memory.

The pipe method uses, under the hood, the events emitted by streams and abstracts away the need to handle these events. The only exception is, that the handling of error events is not included in the abstraction and has to be done separate. Unhandled stream errors can crash your application.

The pipe method is available on streams, which implement a Readable interface. Check out the article What is a Stream in Node.js? for the different types of streams.

The streams Readable, Duplex, Transform and PassThrough implement a Readable interface. The method accepts a destination to pipe data to. The destination stream must implement a Writable interface. The streams Writable, Duplex, Transform and PassThrough implement a Writable interface.

Let's look at an example. Node has a globally available readable stream process.stdin (stdin stands for standard in), and a writable stream process.stdout (stdout stands for standard out).

Create a file (or use the REPL).

touch stream-it.js

Add the following code into it.

process.stdin.pipe(process.stdout);

Then run it in the CLI node stream-it.js and type Banana and hit the enter key. You will see that Banana is echoed back to you.

Let me explain what is happening. The process.stdin is the readable source of data, and the process.stdout is the writable destination. When you input a text, the text is piped from stdin to stdout, creating the echo. Calling pipe will return the destination stream.

With the pipe method it is possible to chain multiple streams together. The requirement for this is that the destination stream is both readable and writable, like Duplex, Transform and PassThrough.

const { PassThrough } = require('stream');

const passThrough = new PassThrough();

process.stdin.pipe(passThrough).pipe(process.stdout);

This pseudo-code a.pipe(b).pipe(c).pipe(d) is exactly the same as a.pipe(b); b.pipe(c); c.pipe(d), on the command line it would be the same as piping programs together like a | b | c | d.

Using the fs module to create streams from files

Implementing streaming interfaces and consuming streams have quite a few differences. Creating streams is not as common as consuming streams, but there are some instances where creating your own stream is useful. The most common use cases is streaming data from and to a file using the fs module.

The fs module is able to create read and writeable streams using the helper methods fs.createReadStream and fs.createWriteStream. The method createWriteStream takes a file path as the first argument, and then optional config arguments.

Let's dive into code and create a simple stream that writes text from stdin to a file called output.txt.

Create a file.

touch stream-to-file.js

Add code.

const fs = require('fs');

const outputStream = fs.createWriteStream('output.txt');

process.stdin.pipe(outputStream);

Run the code in the CLI with node stream-to-file.js and type Hello Stream and hit the enter key. Then log the output.txt to the console with cat output.txt or open the file in a text editor. You will see that Hello Stream was written to the file. In this example, we have replaced the stdout with the variable outputStream which holds the stream created with fs.createWriteStream.

Since there is some data now in the output.txt file, let's invert this and create a readable stream with piping the data from output.txt.

Create a file.

touch stream-out.js

Add code.

const fs = require('fs');

const inputFileStream = fs.createReadStream('output.txt');

inputFileStream.pipe(process.stdout);

Run the file with node stream-out.js and you will see the text from the output.txt file written in the terminal.

When creating a writeable stream from a file, the file will be overwritten by default. This behaviour can be changed with adding a flag when creating the stream. Read more about file system flags. So we can pass {flags: 'a'} for Open file for appending. The file is created if it does not exist.

const fs = require('fs');

const outputStream = fs.createWriteStream('output.txt', {
  flags: 'a',
});

process.stdin.pipe(outputStream);

This will append data to the file if it exists already, or otherwise create the file.

TL;DR

  • The recommended way to consume streams is the pipe and pipeline method.
  • The main goal of pipe is to limit the buffering of data so memory will not be overloaded.
  • The pipe method is available on streams, which implement a Readable interface.
  • With the help of pipe streams can be chained.
  • The fs module can create readable and writeable streams.

Thanks for reading and if you have any questions, use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode, Node.js - Streams, MDN - Streams Node.js - fs

More node articles:

Getting started with Webpack

How to list/debug npm packages?

How to specify a Node.js version

How to create a web server in Node.js

How to dynamically load ESM in CJS

How to convert a CJS module to an ESM

How to create a CJS module

How to stream to an HTTP response

How to handle binary data in Node.js?

How to use streams to ETL data?

How to connect streams with pipeline?

How to handle stream errors?

How to connect streams with pipe?

What Is a Node.js Stream?

Handling Errors in Node (asynchronous)

Handling Errors in Node.js (synchronous)

Introduction to errors in Node.js

Callback to promise-based functions

ETL: Load Data to Destination with Node.js

ETL: Transform Data with Node.js

ETL: Extract Data with Node.js

Event Emitters in Node.js

How to set up SSL locally with Node.js?

How to use async/await in Node.js

What is an API proxy?

How to make an API request in Node.js?

How does the Event Loop work in Node.js

How to wait for multiple Promises?

How to organize Node.js code

Understanding Promises in Node.js

How does the Node.js module system work?

Set up and test a .env file in Node

How to Use Environment Variables in Node

How to clean up node modules?

Restart a Node.js app automatically

How to update a Node dependency - NPM?

What are NPM scripts?

How to uninstall npm packages?

How to install npm packages?

How to create a package.json file?

What Is the Node.js ETL Pipeline?

What is data brokering in Node.js?

How to read and write JSON Files with Node.js?

What is package-lock.json?

How to install Node.js locally with nvm?

How to update Node.js?

How to check unused npm packages?

What is the Node.js fs module?

What is Semantic versioning?

The Basics of Package.json explained

How to patch an NPM dependency

What is NPM audit?

Beginner`s guide to NPM

Getting started with Node.js

Scroll to top ↑