How to implement concurrency control in JavaScript

How to implement concurrency control in JavaScript

1. Introduction to Concurrency Control

Suppose there are 6 pending tasks to be executed, and we want to limit the number of tasks that can be executed simultaneously, that is, at most 2 tasks can be executed simultaneously. When any task in the executing task list is completed, the program will automatically obtain a new to-do task from the to-do task list and add the task to the executing task list. In order to make everyone understand the above process more intuitively, Brother Abao specially drew the following 3 pictures:

1.1 Phase 1

1.2 Phase 2

1.3 Phase 3

Okay, after introducing concurrency control, I will use the async-pool library on Github to introduce the specific implementation of asynchronous task concurrency control.

https://github.com/rxaviers/async-pool

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7.

2. Implementation of Concurrency Control

The async-pool library provides two different versions of implementation: ES7 and ES6. Before analyzing its specific implementation, let's take a look at how to use it.

2.1 Use of asyncPool

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);

In the above code, we use the asyncPool function provided by the async-pool library to implement concurrent control of asynchronous tasks. The signature of the asyncPool function is as follows:

function asyncPool(poolLimit, array, iteratorFn){ ... }

This function receives 3 parameters:

  • poolLimit (number type): indicates the number of concurrent connections to be limited;
  • array (array type): represents a task array;
  • iteratorFn (function type): represents the iteration function, which is used to process each task item. The function returns a Promise object or an asynchronous function.

For the above example, after using the asyncPool function, the corresponding execution process is as follows:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

By observing the above comments, we can roughly understand the control flow inside the asyncPool function. Next, let's analyze the ES7 implementation of the asyncPool function.

2.2 asyncPool ES7 Implementation

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // Store all asynchronous tasks const executing = []; // Store the asynchronous tasks being executed for (const item of array) {
    // Call iteratorFn function to create asynchronous task const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // Save new asynchronous tasks // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) {
      // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // Save the executing asynchronous task if (executing.length >= poolLimit) {
        await Promise.race(executing); // Wait for the faster task to complete }
    }
  }
  return Promise.all(ret);
}

In the above code, the features of Promise.all and Promise.race functions are fully utilized, combined with the async await feature provided in ES7, and finally the concurrency control function is realized. Using the await Promise.race(executing); statement, we will wait for the faster tasks in the list of executing tasks to complete before continuing to the next loop.

The asyncPool ES7 implementation is relatively simple. Next, let’s see how to achieve the same functionality without using the async await feature.

2.3 asyncPool ES6 Implementation

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // Store all asynchronous tasks const executing = []; // Store asynchronous tasks being executed const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // Get a new task item const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) {
      // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing);
       }
    }

     // After the faster tasks in the executing task list are completed, new tasks will be obtained from the array array return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}

In the ES6 implementation, the core control logic is implemented through the internal enqueue function. When the Promise object returned by Promise.race(executing) becomes completed, the enqueue function is called to get the new to-do task from the array.

3. Brother Abao has something to say

In the ES7 and ES6 implementations of the asyncPool library, we used the Promise.all and Promise.race functions. Among them, handwriting Promise.all is a common interview question. Just taking this opportunity, Brother Abao will write a simple version of Promise.all and Promise.race functions with everyone.

3.1 Handwritten Promise.all

The Promise.all(iterable) method returns a promise object. When the status of all the input promise objects becomes resolved, the returned promise object will return the results of each promise object after resolution in the form of an array. When the status of any input promise object becomes rejected, the returned promise object will reject the corresponding error message.

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // Counter, used to determine whether all tasks have been completed let result = []; // Result array for (let i = 0; i < iterators.length; i++) {
        // Considering that iterators[i] may be a normal object, it is packaged as a Promise object Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // Save the corresponding results in order // When all tasks are completed, return the results uniformly if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // If any Promise object fails to execute, the reject() method is called return;
          }
        );
      }
    }
  });
};

It should be noted that for the standard implementation of Promise.all, its parameter is an iterable object, such as Array, String or Set.

3.2 Handwritten Promise.race

The Promise.race(iterable) method returns a promise object. Once a promise object in the iterator is resolved or rejected, the returned promise object will resolve or reject the corresponding value.

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};

In this article, Abao Ge analyzes in detail the specific implementation of async-pool asynchronous task concurrency control, and at the same time enables everyone to better understand the core code of async-pool. Finally, Brother Abao also led everyone to write a simple version of the Promise.all and Promise.race functions. In fact, in addition to the Promise.all function, there is another function - Promise.allSettled, which is used to solve the problems of Promise.all. Interested friends can study it by themselves.

IV. Reference Resources

Github - async-pool
MDN - Promise.all
MDN - Promise.race
MDN - Promise.allSettled

The above is the details of how to implement concurrency control in JavaScript. For more information about JavaScript concurrency control, please pay attention to other related articles on 123WORDPRESS.COM!

You may also be interested in:
  • How to use Promise in JavaScript to control the number of concurrent requests
  • ByteDance interview: How to use JS to implement Ajax concurrent request control
  • Example code for implementing concurrent request control in JavaScript/TypeScript
  • Example code for implementing concurrency control using JavaScript
  • Example of method for controlling concurrent number of js asynchronous interfaces
  • Nodejs crawler advanced tutorial asynchronous concurrency control
  • Nodejs practical experience: eventproxy module to control concurrency

<<:  MySQL FAQ series: How to avoid a sudden increase in the size of the ibdata1 file

>>:  How to use limit_req_zone in Nginx to limit the access to the same IP

Recommend

Problem record of using vue+echarts chart

Preface echarts is my most commonly used charting...

Detailed explanation of the use of MySQL Online DDL

Table of contents text LOCK parameter ALGORITHM p...

Detailed explanation of tinyMCE usage and experience

Detailed explanation of tinyMCE usage initializat...

Analyzing the MySql CURRENT_TIMESTAMP function by example

When creating a time field DEFAULT CURRENT_TIMEST...

How to use Linux to calculate the disk space occupied by timed files

Open the scheduled task editor. Cent uses vim to ...

Create a screen recording function with JS

OBS studio is cool, but JavaScript is cooler. Now...

Several implementation methods of the tab bar (recommended)

Tabs: Category + Description Tag bar: Category =&...

Swiper.js plugin makes it super easy to implement carousel images

Swiper is a sliding special effects plug-in built...

MySQL index optimization: paging exploration detailed introduction

Table of contents MySQL Index Optimization Paging...

How to use dl(dt,dd), ul(li), ol(li) in HTML

HTML <dl> Tag #Definition and Usage The <...

How to use JS to check if an element is within the viewport

Preface Share two methods to monitor whether an e...

Mysql example of splitting into multiple rows and columns by specific symbols

Some fault code tables use the following design p...