How to implement concurrency control in JavaScript

How to implement concurrency control in JavaScript

1. Introduction to Concurrency Control

Suppose there are 6 pending tasks to be executed, and we want to limit the number of tasks that can be executed simultaneously, that is, at most 2 tasks can be executed simultaneously. When any task in the executing task list is completed, the program will automatically obtain a new to-do task from the to-do task list and add the task to the executing task list. In order to make everyone understand the above process more intuitively, Brother Abao specially drew the following 3 pictures:

1.1 Phase 1

1.2 Phase 2

1.3 Phase 3

Okay, after introducing concurrency control, I will use the async-pool library on Github to introduce the specific implementation of asynchronous task concurrency control.

https://github.com/rxaviers/async-pool

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7.

2. Implementation of Concurrency Control

The async-pool library provides two different versions of implementation: ES7 and ES6. Before analyzing its specific implementation, let's take a look at how to use it.

2.1 Use of asyncPool

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);

In the above code, we use the asyncPool function provided by the async-pool library to implement concurrent control of asynchronous tasks. The signature of the asyncPool function is as follows:

function asyncPool(poolLimit, array, iteratorFn){ ... }

This function receives 3 parameters:

  • poolLimit (number type): indicates the number of concurrent connections to be limited;
  • array (array type): represents a task array;
  • iteratorFn (function type): represents the iteration function, which is used to process each task item. The function returns a Promise object or an asynchronous function.

For the above example, after using the asyncPool function, the corresponding execution process is as follows:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

By observing the above comments, we can roughly understand the control flow inside the asyncPool function. Next, let's analyze the ES7 implementation of the asyncPool function.

2.2 asyncPool ES7 Implementation

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // Store all asynchronous tasks const executing = []; // Store the asynchronous tasks being executed for (const item of array) {
    // Call iteratorFn function to create asynchronous task const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // Save new asynchronous tasks // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) {
      // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // Save the executing asynchronous task if (executing.length >= poolLimit) {
        await Promise.race(executing); // Wait for the faster task to complete }
    }
  }
  return Promise.all(ret);
}

In the above code, the features of Promise.all and Promise.race functions are fully utilized, combined with the async await feature provided in ES7, and finally the concurrency control function is realized. Using the await Promise.race(executing); statement, we will wait for the faster tasks in the list of executing tasks to complete before continuing to the next loop.

The asyncPool ES7 implementation is relatively simple. Next, let’s see how to achieve the same functionality without using the async await feature.

2.3 asyncPool ES6 Implementation

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // Store all asynchronous tasks const executing = []; // Store asynchronous tasks being executed const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // Get a new task item const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) {
      // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing);
       }
    }

     // After the faster tasks in the executing task list are completed, new tasks will be obtained from the array array return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

In the ES6 implementation, the core control logic is implemented through the internal enqueue function. When the Promise object returned by Promise.race(executing) becomes completed, the enqueue function is called to get the new to-do task from the array.

3. Brother Abao has something to say

In the ES7 and ES6 implementations of the asyncPool library, we used the Promise.all and Promise.race functions. Among them, handwriting Promise.all is a common interview question. Just taking this opportunity, Brother Abao will write a simple version of Promise.all and Promise.race functions with everyone.

3.1 Handwritten Promise.all

The Promise.all(iterable) method returns a promise object. When the status of all the input promise objects becomes resolved, the returned promise object will return the results of each promise object after resolution in the form of an array. When the status of any input promise object becomes rejected, the returned promise object will reject the corresponding error message.

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // Counter, used to determine whether all tasks have been completed let result = []; // Result array for (let i = 0; i < iterators.length; i++) {
        // Considering that iterators[i] may be a normal object, it is packaged as a Promise object Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // Save the corresponding results in order // When all tasks are completed, return the results uniformly if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // If any Promise object fails to execute, the reject() method is called return;
          }
        );
      }
    }
  });
};

It should be noted that for the standard implementation of Promise.all, its parameter is an iterable object, such as Array, String or Set.

3.2 Handwritten Promise.race

The Promise.race(iterable) method returns a promise object. Once a promise object in the iterator is resolved or rejected, the returned promise object will resolve or reject the corresponding value.

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

In this article, Abao Ge analyzes in detail the specific implementation of async-pool asynchronous task concurrency control, and at the same time enables everyone to better understand the core code of async-pool. Finally, Brother Abao also led everyone to write a simple version of the Promise.all and Promise.race functions. In fact, in addition to the Promise.all function, there is another function - Promise.allSettled, which is used to solve the problems of Promise.all. Interested friends can study it by themselves.

IV. Reference Resources

Github - async-pool
MDN - Promise.all
MDN - Promise.race
MDN - Promise.allSettled

The above is the details of how to implement concurrency control in JavaScript. For more information about JavaScript concurrency control, please pay attention to other related articles on 123WORDPRESS.COM!

You may also be interested in:
  • How to use Promise in JavaScript to control the number of concurrent requests
  • ByteDance interview: How to use JS to implement Ajax concurrent request control
  • Example code for implementing concurrent request control in JavaScript/TypeScript
  • Example code for implementing concurrency control using JavaScript
  • Example of method for controlling concurrent number of js asynchronous interfaces
  • Nodejs crawler advanced tutorial asynchronous concurrency control
  • Nodejs practical experience: eventproxy module to control concurrency

<<:  MySQL FAQ series: How to avoid a sudden increase in the size of the ibdata1 file

>>:  How to use limit_req_zone in Nginx to limit the access to the same IP

Recommend

js realizes the function of clicking to switch cards

This article example shares the specific code of ...

Nginx stream configuration proxy (Nginx TCP/UDP load balancing)

Prelude We all know that nginx is an excellent re...

How to implement two-way binding function in vue.js with pure JS

Table of contents First, let's talk about the...

Summary of Mysql update multi-table joint update method

Next, I will create two tables and execute a seri...

Use Vue3 for data binding and display list data

Table of contents 1. Comparison with Vue2 1. New ...

TimePicker in element disables part of the time (disabled to minutes)

The project requirements are: select date and tim...

Centos7.5 installs mysql5.7.24 binary package deployment

1. Environmental preparation: Operating system: C...

What is the file mysql-bin.000001 in mysql? Can it be deleted?

After installing MySQL using ports, I found that ...

Use and optimization of MySQL COUNT function

Table of contents What does the COUNT function do...

Analysis of Context application scenarios in React

Context definition and purpose Context provides a...

Why are the pictures on mobile web apps not clear and very blurry?

Why? The simplest way to put it is that pixels are...

A brief discussion on whether too many MySQL data queries will cause OOM

Table of contents Impact of full table scan on th...

The benefits of div+css and web standard pages

The div element is used to provide structure and b...

Solution to the problem that input in form cannot be submitted when disabled

I wrote a test program before, in which adding and...

How to create a swap partition file in Linux

Introduction to Swap Swap (i.e. swap partition) i...