Table of contents:

Introduction

This article explores two different approaches to implementing concurrency in Node.js: (1) Asynchronous functions and (2) worker threads. Both of these have strong support in Node.js. They each have different optimal use cases.

If you choose to examine the rest of this article, you may want to be asking yourself the following questions:

  • Are we achieving concurrency? That is, are separate sections of our code running parallel to each other? Or, are multiple instances of the same code running in parallel?

  • Are we actually gaining some benefit such as elapsed time saved?

  • How can we show and measure the amount of benefit, for example the amount of time saved by our concurrency?

  • If we have two code samples (for example, copied from the Web somewhere), how do we determine which is better suited to our particular use case?

See the "Analysis" section at the end of this document for a discussion of these questions.

Asynchronous functions

For information on async functions, await, promises, etc, see:

Consider using these when your application makes heavy use of I/O or network or sockets. Applications that read files, make HTTP requests, and make database queries are all obvious targets.

The Node.js library contains many async functions -- see Node.js API documentation. Look for functions that take a callback as a parameter.

Here is a simple example:

https.get('https://example.com', (res) => {
  let data = '';
  // Collect data chunks
  res.on('data', (chunk) => {
    data += chunk;
  });
  // Process the complete response
  res.on('end', () => {
    count++;
    console.log(`count: ${count}\n`, data);
  });
}).on('error', (err) => {
  console.error('Error:', err.message);
});

Notice the expression that surrounds =>. That's called an arrow function. It's an alternative way of writing an anonymous function. You can read about arrow functions here -- Arrow function expressions.

In the case of the first one, we're passing it as a callback to the http.get function call. That function will be called when the http.get request has something to pass back to us: in this case data (res.on) or end of data.

Promises

An example:

const url = 'http://www.example.com';
const text = await (await fetch(url)).text();

Why might you do this? Consider the following scenario:

You want to request a set of Web pages and then apply some process to them. Furthermore, processing any one of those pages does not depend on the processing of any other. So, you would like to make all the requests and then as each Web page is received, begin processing it. The following example illustrates this.

This example is a bit more complex and perhaps also more helpful:

const separator = '-'.repeat(50);
const urls = [
  'http://www.example.com',
  'https://www.nodejs.org',
  'https://www.npmjs.com',
  'http://www.reifywork.com',
];

function logSeparator() {
  count++;
  console.log(`${count} ${separator}`);
}

async function test02() {
  logSeparator();
  const promises = [];                      // Step 1
  for (let url of urls) {
    promises.push(fetch(url));
  }
  logSeparator();
  const responses = [];                     // Step 2
  for (let promise of promises) {
    responses.push(await promise);
  }
  logSeparator();
  const texts = [];                         // Step 3
  for (let response of responses) {
    texts.push(await response.text());
  }
  texts.forEach((text, idx) => {            // Step 4
    log(`${idx}. url: ${urls[idx]}  length: ${text.length}`);
  });
  logSeparator();
}

Let's unroll this so that it makes more sense. Keep in mind what await is:

The await operator is used to wait for a Promise and get its
fulfillment value. It can only be used inside an async function or
at the top level of a module.
[https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await]
  1. Step 1. We take an array of URLs and use the JavaScript fetch function to produce an array of promises.

  2. Step 2. We take that array of promises and use the await operator to produce an array of responses.

  3. Step 3. We take the array of responses and use the await operator to produce an array of texts (strings).

  4. Step 4. And, finally, We print out the length and corresponding URL for each of those texts.

And, now let's look at a version of the above that does much the same thing, but uses some Node.js idioms that are a bit more concise and perhaps even run a bit faster.

async function test03() {
  logSeparator();
  // Step 1
  const promises  = urls.map((url) => { return fetch(url); });
  logSeparator();
  // Step 2
  const responses = await Promise.all(promises);
  logSeparator();
  // Step 3
  const texts = await Promise.all(responses.map((response) => {return response.text();} ));
  logSeparator();
  // Step 4
  texts.forEach((text, idx) => {
    log(`${idx}. url: ${urls[idx]}  length: ${text.length}`);
  });
  logSeparator();
  return texts;
}

Notes:

  • We use the Array.map function to transform an array from one set of members to the kind we need.

  • We use await Promise.all to process and wait for an array of promises in parallel.

worker_threads

Here is an example of an attempt to achieve some concurrency and parallelism through the use of worker threads (using the worker_threads module.

Some help and information about worker_threads is here:

The complete example script, discussed below, is here: test_worker05.js

Here is our test source code.

// File: test_worker05.js

// Note 1
const log = console.log;
import { Command } from 'commander';
import babel from '@babel/parser';
import util from 'node:util';
import fs from 'node:fs';
import { fileURLToPath } from "url";
const __filename = fileURLToPath(import.meta.url);

let count = -100;

// Note 2
import {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} from 'node:worker_threads';

// Note 3
function message(msg) {
  count++;
  log(`${count}. ${msg}`);
}

// Note 4
function mainThreadFn() {
  count = 0;
  message('starting mainthread');
  const program = new Command();
  program
    .option('-i, --injsfile <value>', 'input JavaScript file name.', )
    .option('-w, --workers <number>', 'numer of worker threads', 4)
    .option('-r, --repetitions <number>', 'numer of repetitions', 1)
    .option('-d, --depth <value>', 'depth for struct from inspect (N or Infinity)', 0)
    .option('-s, --silent', 'do not write messages', false, )
  ;
  program.parse();
  const opts = program.opts();
  message(`opts: ${util.inspect(opts)}`, );
  message(`typeof(workers): ${typeof(opts.workers)}`);
  message(`typeof(repetitions): ${typeof(opts.repetitions)}`);
  const workerThreads = [];
  const jsScript = fs.readFileSync(opts.injsfile, {encoding: 'utf-8'}).toString();
  globalThis.parseJSAsync = function (script) {
    return new Promise((resolve, reject) => {
      for (let idx = 0; idx < opts.workers; idx++) {
        workerThreads.push(new Worker(__filename, {
          workerData: {
            idx: idx,
            script: script,
            repetitions: opts.repetitions,
            depth: opts.depth,
          },
        }));
      }
      workerThreads.forEach((worker, idx) => {
        worker.on('message', ((msg) => { message(`msg ${idx}: "${msg}"`); }));
        worker.on('error', reject);
        worker.on('exit', (code) => {
          if (code !== 0)
            reject(new Error(`Worker stopped with exit code ${code}`));
        });
      }) // end for
    });
  };
  globalThis.parseJSAsync(jsScript);
  message('finished mainthread');
}

// Note 5
async function workerThreadFn() {
  const idx = workerData.idx;
  let dateTime = new Date();
  message(`starting workerthread -- idx: ${idx} -- ` +
    `time: ${dateTime.getMinutes()}.${dateTime.getMilliseconds()}`);
  const script = workerData.script;
  message('-- script: --\n', script, '\n-- end script --');
  for (let repIdx = 0; repIdx < workerData.repetitions; repIdx++) {
    const tree = babel.parse(script, {sourceType: 'module'})
    const txtTree = util.inspect(tree, {depth: workerData.depth});
    // Note 6
    const msg = `reply from worker ${idx} --\n${txtTree}\n-- end reply ${idx} --`;
    parentPort.postMessage(msg);
  }
  dateTime = new Date();
  message(`finished workerthread -- idx: ${idx} -- ` +
    `time: ${dateTime.getMinutes()}.${dateTime.getMilliseconds()}`);
}

// Note 7
async function main() {
  let dateTime = new Date();
  message(`starting function main -- ` +
    `time: ${dateTime.getMinutes()}.${dateTime.getMilliseconds()}`);
  if (isMainThread) {
    await mainThreadFn();
  } else {
    await workerThreadFn();
  }
  dateTime = new Date();
  message(`finished function main -- ` +
    `time: ${dateTime.getMinutes()}.${dateTime.getMilliseconds()}`);
}

await main();

Notes and explanation -- See comments "// Note n" in the code above::

  • Note 1. We import some needed modules.

  • Note 2. We import the needed bits from the JavaScript built-in module worker_threads.

  • Note 3. The function message is a helper function that we use to print status information as the test proceeds.

  • Note 4. The mainThreadFn function is called when we are executing as the main thread. This function creates an array (workerThreads) containing newly created workers. Then for each of those workers, it does a bit of setup. In particular, it specifies what is to be done when the worker posts a message.

  • Note 5. The workerThreadFn function is called when we are executing as a worker thread. It contains the code for our workload for each worker. In our demo, it uses the babel module to parse a JavaScript file, converting it to a structure, then converts that structure to text (using util.inspect), and posts that text to the main thread.

  • Note 6. The parentPort enables us to post a message back to the main thread. That's what worker.on('message', ... sets up for us.

  • Note 7. The main function calls the mainTreadFn when we are executing as the main thread and it calls the workerThreadFn when we are executing as a worker.

Additional notes:

  • If you need a way to uniquely identity each of your different individual worker threads, you can use worker.threadId.

Analysis

Here are some suggestions (your mileage may vary):

  • Use async functions when your code is I/O intensive or is making requests across the network. The idea is that while your process is waiting for that slow access, it can be doing other work.

  • Use worker threads when your code is compute intensive.

With respect to asynchronous programming, in addition to the links listed near the top, I found hints here:

  • (How do I await multiple promises in-parallel without 'fail-fast' behavior?)[https://stackoverflow.com/questions/41292316/how-do-i-await-multiple-promises-in-parallel-without-fail-fast-behavior]

  • (Node.js Async/Await at W3 Schools)[https://www.w3schools.com/nodejs/nodejs_async_await.asp]


Published

Category

Nodejs

Tags

Contact