How to use streams to ETL data?
© https://nodejs.org/en/

How to use streams to ETL data?

Learn how to use streams to extract, transform and load data

ByMario Kandut

honey pot logo

Europe’s developer-focused job platform

Let companies apply to you

Developer-focused, salary and tech stack upfront.

Just one profile, no job applications!

Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.

This article is based on Node v16.14.0.

Streams in Node.js

💰 The Pragmatic Programmer: journey to mastery. 💰 One of the best books in software development, sold over 200,000 times.

This is the fifth article of a series about streams in Node.js. This article is about how to perform ETL operations (Extract, Transform, Load) on CSV data using streams.

Streams in Node.js

Overview

When working with a flat data, we can just use the fs module and streams to process the data (memory-efficient). Instead of reading all the data into memory, we can read it in small chunks with the help of streams to avoid overconsumption of the memory.

In this article we are going to create sample data in a CSV file, extract this data, transform it and load the data.

A CommaSeparatedValues file is a delimited text file that uses a comma to separate values. Read more here. We will transform the CSV data to JSON or better to ndjson, which is basically a file of JSON records separated by newlines and with the file extension .ndjson. For sure, you are asking yourself - why are we not just using JSON? The main reason is fault tolerance. If only one single invalid record is written to JSON, the entire JSON file will be corrupted. The main difference between JSON and ndjson is that in ndjson files each line of a file must contain a single JSON record. Hence, a ndjson file contains valid JSON, but ndjson is not a valid JSON document. The ndjson format works well with streaming data and large sets of data, where each record is processed individually.

We are going to:

    1. Create CSV sample data
    1. Initialize project for NPM
    1. Create a CSV parser
    1. Add Transform stream
    1. Run & Done

1. Create CSV data

Let's create some sample CSV data, you can use the sample data below, or create your own data with FakerJS and convert it to CSV.

id,firstName,lastName,email,email2,randomized
100,Jobi,Taam,[email protected],[email protected],Z lsmDLjL
101,Dacia,Elephus,[email protected],[email protected],Za jfPaJof
102,Arlina,Bibi,[email protected],[email protected],zmzlfER
103,Lindie,Torray,[email protected],[email protected],ibVggFEh
104,Modestia,Leonard,[email protected],[email protected]," Tit KCrdh"
105,Karlee,Cornelia,[email protected],[email protected],PkQCUXzq
106,Netty,Travax,[email protected],[email protected],psJKWDBrXm
107,Dede,Romelda,[email protected],[email protected],heUrfT
108,Sissy,Crudden,[email protected],[email protected],cDJxC
109,Sherrie,Sekofski,[email protected],[email protected],dvYHUJ
110,Sarette,Maryanne,[email protected],[email protected],rskGIJNF
111,Selia,Waite,[email protected],[email protected],DOPBe
112,Karly,Tjon,[email protected],[email protected],zzef nCMVL
113,Sherrie,Berriman,[email protected],[email protected],rQqmjw
114,Nadine,Greenwald,[email protected],[email protected],JZsmKafeIf
115,Antonietta,Gino,[email protected],[email protected],IyuCBqwlj
116,June,Dorothy,[email protected],[email protected],vyCTyOjt
117,Belva,Merriott,[email protected],[email protected],MwwiGEjDfR
118,Robinia,Hollingsworth,[email protected],[email protected],wCaIu
119,Dorthy,Pozzy,[email protected],[email protected],fmWOUCIM
120,Barbi,Buffum,[email protected],[email protected],VOZEKSqrZa
121,Priscilla,Hourigan,[email protected],[email protected],XouVGeWwJ
122,Tarra,Hunfredo,[email protected],[email protected],NVzIduxd
123,Madalyn,Westphal,[email protected],[email protected],XIDAOx
124,Ruthe,McAdams,[email protected],[email protected],iwVelLKZH
125,Maryellen,Brotherson,[email protected],[email protected],nfoiVBjjqw
126,Shirlee,Mike,[email protected],[email protected],MnTkBSFDfo
127,Orsola,Giule,[email protected],[email protected],VPrfEYJi
128,Linzy,Bennie,[email protected],[email protected],ZHctp
129,Vanessa,Cohdwell,[email protected],[email protected],RvUcbJihHf
130,Jaclyn,Salvidor,[email protected],[email protected],gbbIxz
131,Mildrid,Pettiford,[email protected],[email protected],snyeV
132,Carol-Jean,Eliathas,[email protected],[email protected],EAAjYHiij
133,Susette,Ogren,[email protected],[email protected]," BhYgr"
134,Farrah,Suanne,[email protected],[email protected],hYZbZIc
135,Cissiee,Idelia,[email protected],[email protected],PNuxbvjx
136,Alleen,Clara,[email protected],[email protected],YkonJWtV
137,Merry,Letsou,[email protected],[email protected],sLfCumcwco
138,Fanny,Clywd,[email protected],[email protected],Go kx
139,Trixi,Pascia,[email protected],[email protected],lipLcqRAHr
140,Sandie,Quinn,[email protected],[email protected],KrGazhI
141,Dania,Wenda,[email protected],[email protected],CXzs kDv
142,Kellen,Vivle,[email protected],[email protected],RrKPYqq
143,Jany,Whittaker,[email protected],[email protected],XAIufn
144,Lusa,Fillbert,[email protected],[email protected],FBFQnPm
145,Farrah,Edee,[email protected],[email protected],TrCwKb
146,Felice,Peonir,[email protected],[email protected],YtVZywf
147,Starla,Juan,[email protected],[email protected],aUTvjVNyw
148,Briney,Elvyn,[email protected],[email protected],tCEvgeUbwF
149,Marcelline,Ricarda,[email protected],[email protected],sDwIlLckbd
150,Mureil,Rubie,[email protected],[email protected],HbcfbKd
151,Nollie,Dudley,[email protected],[email protected],EzjjrNwVUm
152,Yolane,Melony,[email protected],[email protected],wfqSgpgL
153,Brena,Reidar,[email protected],[email protected],iTlvaS
154,Glenda,Sabella,[email protected],[email protected],zzaWxeI
155,Paola,Virgin,[email protected],[email protected],gJO hXTWZl
156,Aryn,Erich,[email protected],[email protected],qUoLwH
157,Tiffie,Borrell,[email protected],[email protected],cIYuVMHwF
158,Anestassia,Daniele,[email protected],[email protected],JsDbQbc
159,Ira,Glovsky,[email protected],[email protected],zKITnYXyhC
160,Sara-Ann,Dannye,[email protected],[email protected],wPClmU
161,Modestia,Zina,[email protected],[email protected],YRwcMqPK
162,Kelly,Poll,[email protected],[email protected],zgklmO
163,Ernesta,Swanhildas,[email protected],[email protected],tWafP
164,Giustina,Erminia,[email protected],[email protected],XgOKKAps
165,Jerry,Kravits,[email protected],[email protected],olzBzS
166,Magdalena,Khorma,[email protected],[email protected],BBKPB
167,Lory,Pacorro,[email protected],[email protected],YmWQB
168,Carilyn,Ethban,[email protected],[email protected],KUXenrJh
169,Tierney,Swigart,[email protected],[email protected],iQCQJ
170,Beverley,Stacy,[email protected],[email protected],NMrS Zpa f
171,Ida,Dex,[email protected],[email protected],hiIgOCxNg
172,Sam,Hieronymus,[email protected],[email protected],dLSkVe
173,Lonnie,Colyer,[email protected],[email protected],ZeDosRy
174,Rori,Ethban,[email protected],[email protected],SXFZQmX
175,Lelah,Niles,[email protected],[email protected],NwxvCXeszl
176,Kathi,Hepsibah,[email protected],[email protected],SOcAOSn
177,Dominga,Cyrie,[email protected],[email protected],IkjDyuqK
178,Pearline,Bakerman,[email protected],[email protected],vHVCkQ
179,Selma,Gillan,[email protected],[email protected],hSZgpBNsw
180,Bernardine,Muriel,[email protected],[email protected],AnSDTDa U
181,Ermengarde,Hollingsworth,[email protected],[email protected],IYQZ Nmv
182,Marguerite,Newell,[email protected],[email protected],kSaD uaHH
183,Albertina,Nisbet,[email protected],[email protected],Y jHyluB
184,Chere,Torray,[email protected],[email protected],loElYdo
185,Vevay,O'Neill,Vevay.O'[email protected],Vevay.O'[email protected],uLZSdatVn
186,Ann-Marie,Gladstone,[email protected],[email protected],fwKlEksI
187,Donnie,Lymann,[email protected],[email protected],deBrqXyyjf
188,Myriam,Posner,[email protected],[email protected],gEMZo
189,Dale,Pitt,[email protected],[email protected],OeMdG
190,Cindelyn,Thornburg,[email protected],[email protected],kvhFmKGoMZ
191,Maisey,Hertzfeld,[email protected],[email protected],OajjJ
192,Corina,Heisel,[email protected],[email protected],luoDJeHo
193,Susette,Marcellus,[email protected],[email protected],AXHtR AyV
194,Lanae,Sekofski,[email protected],[email protected],FgToedU
195,Linet,Beebe,[email protected],[email protected],DYGfRP
196,Emilia,Screens,[email protected],[email protected],LXUcleSs
197,Tierney,Avi,[email protected],[email protected],VegzbHH
198,Pollyanna,Thar,[email protected],[email protected],GjYeEGK
199,Darci,Elephus,[email protected],[email protected],DaQNdN

Create a project folder:

mkdir node-streams-etl

Create a csv file in the folder:

cd node-streams-etl
touch sample-data.csv

Copy all sample data into the csv file and save it. Use copy+paste or fs.writeFile in the REPL or with the -p flag in the terminal.

2. Initialize project for NPM

We are going to use npm packages, hence, we have to initialize the project to get a package.json

npm init -y

Let's add a main file for the code.

touch index.js

First, we are going to create a readable stream to read the CSV data from sample-date.csv, and a writable stream, which will be the destination. For now, we just copy the sample data. To connect readStream and writeStream we are going to use the pipeline method. Error handling is much easier than with the pipe method. Check out the article How to Connect streams with the pipeline method.

const fs = require('fs');
const { pipeline } = require('stream');

const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');

pipeline(inputStream, outputStream, err => {
  if (err) {
    console.log('Pipeline encountered an error.', err);
  } else {
    console.log('Pipeline completed successfully.');
  }
});

3. Create a CSV parser

We have to convert the CSV file to JSON, as so often, for every problem, there is a package. In that use-case, there is csvtojson. This module will parse the header row to get key and then parse each row to create a JSON object.

Let's install it.

npm install csvtojson

After the successful installation we can require the module and add it to the pipeline after the inputStream. The data will flow from CSV file to CSV Parser then into Output file.

We are going to use the pipeline method, since it's the preferred way since Node.js v.10 to connect streams and pipe data between them. It also helps to clean up streams on completion or failure, because when an error occurs the streams involved will be destroyed to avoid memory leaks.

const fs = require('fs');
const { pipeline } = require('stream');
const csv = require('csvtojson');

const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');

const csvParser = csv();

pipeline(inputStream, csvParser, outputStream, err => {
  if (err) {
    console.log('Pipeline encountered an error.', err);
  } else {
    console.log('Pipeline completed successfully.');
  }
});

4. Add Transform stream

The data is now emitted to the outputStream as ndjson with each data row a valid JSON. Now, we want to transform the data. Since we are using csvtojson, we could utilize the built-in subscribe method, which could be used to handle each record after it has been parsed. Though, we want to create a transform stream. Our sample data has the keys id, firstName, lastName, email, email2, randomized. We want to get rid of the randomized property in each entry and rename email2 to emailBusiness.

Transform streams must implement a transform method that receives chunk of data as the first argument. It will also receive the encoding type of the data chunk, and a callback function.

const transformStream = new Transform({
  transform(chunk, encoding, cb) {
    try {
      // clone person object
      let person = Object.assign({}, JSON.parse(chunk));
      // remove randomized property and rename email2 to emailBusiness
      person = {
        id: person.id,
        firstName: person.firstName,
        lastName: person.lastName,
        emailBusiness: person.email2,
      };
      cb(null, JSON.stringify(person) + `\n`);
    } catch (err) {
      cb(err);
    }
  },
});

Now let's add the transformStream to the pipeline.

pipeline(
  inputStream,
  csvParser,
  transformStream,
  outputStream,
  err => {
    if (err) {
      console.log('Pipeline encountered an error.', err);
    } else {
      console.log('Pipeline completed successfully.');
    }
  },
);

5. Run & Done

Run the application with node index.js and the data in the ndjson file should look like this.

{"id":"100","firstName":"Jobi","lastName":"Taam","emailBusiness":"[email protected]"}
{"id":"101","firstName":"Dacia","lastName":"Elephus","emailBusiness":"[email protected]"}
{"id":"102","firstName":"Arlina","lastName":"Bibi","emailBusiness":"[email protected]"}

Error handling always has to be done, when working with streams. Since we already did the error handling for all streams, because we are using the pipeline method, the sample project is done.

Congratulations. 🚀✨

TL;DR

  • The Newline-delimited JSON (ndjson) format works well with streaming data and large sets of data, where each record is processed individually, and it helps to reduce errors.
  • Using pipeline simplifies error handling and stream cleanup, and it makes combining streams more readable and maintainable.

Thanks for reading and if you have any questions, use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode, Node.js - Streams, MDN - Streams, Format and MIME Type, ndjson, csvtojson

More node articles:

Getting started with Webpack

How to list/debug npm packages?

How to specify a Node.js version

How to create a web server in Node.js

How to dynamically load ESM in CJS

How to convert a CJS module to an ESM

How to create a CJS module

How to stream to an HTTP response

How to handle binary data in Node.js?

How to use streams to ETL data?

How to connect streams with pipeline?

How to handle stream errors?

How to connect streams with pipe?

What Is a Node.js Stream?

Handling Errors in Node (asynchronous)

Handling Errors in Node.js (synchronous)

Introduction to errors in Node.js

Callback to promise-based functions

ETL: Load Data to Destination with Node.js

ETL: Transform Data with Node.js

ETL: Extract Data with Node.js

Event Emitters in Node.js

How to set up SSL locally with Node.js?

How to use async/await in Node.js

What is an API proxy?

How to make an API request in Node.js?

How does the Event Loop work in Node.js

How to wait for multiple Promises?

How to organize Node.js code

Understanding Promises in Node.js

How does the Node.js module system work?

Set up and test a .env file in Node

How to Use Environment Variables in Node

How to clean up node modules?

Restart a Node.js app automatically

How to update a Node dependency - NPM?

What are NPM scripts?

How to uninstall npm packages?

How to install npm packages?

How to create a package.json file?

What Is the Node.js ETL Pipeline?

What is data brokering in Node.js?

How to read and write JSON Files with Node.js?

What is package-lock.json?

How to install Node.js locally with nvm?

How to update Node.js?

How to check unused npm packages?

What is the Node.js fs module?

What is Semantic versioning?

The Basics of Package.json explained

How to patch an NPM dependency

What is NPM audit?

Beginner`s guide to NPM

Getting started with Node.js

Scroll to top ↑