ETL: Load Data to Destination with Node.js
© https://nodejs.org/en/

ETL: Load Data to Destination with Node.js

Create the third phase for an ETL pipeline in Node.

By Mario Kandut

Europe’s developer-focused job platform

Let companies apply to you

Developer-focused, salary and tech stack upfront.

Just one profile, no job applications!

This article is based on Node v16.14.0.

ETL is a process of extracting, transforming, and loading data from one or multiple sources into a destination. Have a look at the article ETL pipeline explained for a general overview on ETL pipelines.

💰 The Pragmatic Programmer: journey to mastery. 💰 One of the best books in software development, sold over 200,000 times.

This is the third article of a series of three articles, and it tries to explain the load phase in an ETL pipeline.

Load Data in an ETL pipeline

The third phase in an ETL pipeline is to Load the transformed data to the destination. After extracting the data from source and transforming it in the format we want, we can load it to its destination. In the example we are going to write the transformed data into a JSON flat file on the file system. In a real world scenario the data would be loaded into a database, or some kind of cloud storage.

There are two different approaches on loading data:

  • bulk load (all data loaded at once)
  • insert load (insert each data record one at a time)

Deciding on the data loading approach depends on the destination and on the size of the data to be loaded. For some systems, it might be more resource intensive to insert each record one by one than to load bulk data at once, for others it may be the other way around. It depends on the destination and what loading method is support and what the limitations are.

Bulk loading data

Loading the data in bulks can be the simplest, and most efficient approach. It's basically sending many data items at once to be inserted to the destination.

Let's take a look at bulk loading all of our data to a JSON file. We are going to write a function, which will get the transformed photoAlbum (array with photoObjects) as an argument, stringify them to JSON, and eventually write them to a file. In the example the destination is a JSON file on the file system.

We are going to continue with the example used in the previous article ETL: Transform Data with Node.js, but with only one photoAlbum.

Create another file load.js in the project folder, which is going to contain the load functions.

touch load.js

Create a bulkLoadPhotoAlbum function, which will get the transformed data, the output file path, and the file name as arguments. Error handling is needed, since outputFilePath and fileName are required, so we have to throw errors if one or both are missing.

const { promisify } = require('util');
const fs = require('fs');
const writeFilePromised = promisify(fs.writeFile);

function bulkLoadPhotoAlbum(photoAlbums, outputFilePath, fileName) {
  if (!outputFilePath) {
    throw new Error('Filepath required as second arguement');
  }
  if (!fileName) {
    throw new Error('FileName is required as third arguement');
  }
  return writeFilePromised(
    `${outputFilePath}/${fileName}.json`,
    JSON.stringify(photoAlbums, null, 2),
  );
}

module.exports = { bulkLoadPhotoAlbum };

Add bulk load data function to ETL pipeline

We have to add the bulkLoadPhotoAlbum function in the orchestrateFunction() in index.js at the last step.

const { getPhotos } = require('./extract');
const { addTimeStamp, transformPhoto } = require('./transform');
const { bulkLoadPhotoAlbum } = require('./load');

const orchestrateEtlPipeline = async () => {
  try {
    // EXTRACT
    const photoAlbum1 = await getPhotos(1);

    // TRANSFORM
    let transformedPhotoAlbum1 = photoAlbum1.map(photo =>
      transformPhoto(photo),
    );

    transformedPhotoAlbum1 = addTimeStamp(transformedPhotoAlbum1);

    // LOAD
    await bulkLoadPhotoAlbum(
      transformedPhotoAlbum1,
      __dirname,
      'album-1',
    );
  } catch (error) {
    console.error(error);
  }
};

orchestrateEtlPipeline();

Now, run the function with node index.js and check the created album-1.json file with the transformed data in it.

Load one record at a time

Sometimes the destination system requires you to load records one at a time. That could be the case, if the destination system needs to treat each record as an incoming event to handle the data appropriately. When you load one data a time, this would result in many more requests than one bulk load and has to be considered. To load data records one at a time, you can iterate over all the records and handle them individually.

In the following example we are going to mock the insert behaviour in a data base table. Since there is no data base table present in this example, there will be no file written to, just an output when everything is done.

Add the inserRecord() function in load.js, which is going to represent inserting one record at a time.

function insertRecord(photo) {
  // mocked function
  // return a Promise that resolves when photo was inserted
  return Promise.resolve();
}

module.exports = { insertRecord };

In the index.js file, we have to treat each transformed record individually with calling insertRecord() on the photoObject. We are going to iterate over each item in the array with reduce. When using reduce we pass it a function to run on each iteration, and an initial accumulator value. Using reduce this way allows us to use async/await to sequentially chain any number of promises without having to hard code each .then handler. Read more about reduce in the MDN docs.

const { getPhotos } = require('./extract');
const { addTimeStamp, transformPhoto } = require('./transform');
const { bulkLoadPhotoAlbum } = require('./load');

const orchestrateEtlPipeline = async () => {
  try {
    // EXTRACT
    const photoAlbum1 = await getPhotos(1);

    // TRANSFORM
    let transformedPhotoAlbum1 = photoAlbum1.map(photo =>
      transformPhoto(photo),
    );

    transformedPhotoAlbum1 = addTimeStamp(transformedPhotoAlbum1);

    // LOAD
    await transformedPhotoAlbum1.data.reduce(
      async (previousPromise, photo) => {
        await previousPromise;
        return insertRecord(photo);
      },
      Promise.resolve(),
    );
  } catch (error) {
    console.error(error);
  }
};

orchestrateEtlPipeline();

TL;DR

  • The third phase in an ETL pipeline is to load the transformed data into its destination.
  • The destination can be a database, a flat file, a data warehouse or anything else where data can be stored.
  • The destination system will determine what options for loading are available - bulk or inserting each record individually.
  • Consider the limitations of each loading method and choose whichever method makes the most sense based on those limitations.

Thanks for reading and if you have any questions, use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode, OsioLabs, MDN async/await, MDN reduce

More node articles:

How to create a web server in Node.js

How to dynamically load ESM in CJS

How to convert a CJS module to an ESM

How to create a CJS module

How to stream to an HTTP response

How to handle binary data in Node.js?

How to use streams to ETL data?

How to connect streams with pipeline?

How to handle stream errors?

How to connect streams with pipe?

What Is a Node.js Stream?

Handling Errors in Node (asynchronous)

Handling Errors in Node.js (synchronous)

Introduction to errors in Node.js

Callback to promise-based functions

ETL: Load Data to Destination with Node.js

ETL: Transform Data with Node.js

ETL: Extract Data with Node.js

Event Emitters in Node.js

How to set up SSL locally with Node.js?

How to use async/await in Node.js

What is an API proxy?

How to make an API request in Node.js?

How does the Event Loop work in Node.js

How to wait for multiple Promises?

How to organize Node.js code

Understanding Promises in Node.js

How does the Node.js module system work?

Set up and test a .env file in Node

How to Use Environment Variables in Node

How to clean up node modules?

Restart a Node.js app automatically

How to update a Node dependency - NPM?

What are NPM scripts?

How to uninstall npm packages?

How to install npm packages?

How to create a package.json file?

What Is the Node.js ETL Pipeline?

What is data brokering in Node.js?

How to read and write JSON Files with Node.js?

What is package-lock.json?

How to install Node.js locally with nvm?

How to update Node.js?

How to check unused npm packages?

What is the Node.js fs module?

What is Semantic versioning?

The Basics of Package.json explained

How to patch an NPM dependency

What is NPM audit?

Beginner`s guide to NPM

Getting started with Node.js

Scroll to top ↑