1. Introduction to Concurrency ControlSuppose there are 6 pending tasks to be executed, and we want to limit the number of tasks that can be executed simultaneously, that is, at most 2 tasks can be executed simultaneously. When any task in the executing task list is completed, the program will automatically obtain a new to-do task from the to-do task list and add the task to the executing task list. In order to make everyone understand the above process more intuitively, Brother Abao specially drew the following 3 pictures: 1.1 Phase 11.2 Phase 21.3 Phase 3Okay, after introducing concurrency control, I will use the async-pool library on Github to introduce the specific implementation of asynchronous task concurrency control. https://github.com/rxaviers/async-pool Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7. 2. Implementation of Concurrency ControlThe async-pool library provides two different versions of implementation: ES7 and ES6. Before analyzing its specific implementation, let's take a look at how to use it. 2.1 Use of asyncPoolconst timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); In the above code, we use the asyncPool function provided by the async-pool library to implement concurrent control of asynchronous tasks. The signature of the asyncPool function is as follows: function asyncPool(poolLimit, array, iteratorFn){ ... } This function receives 3 parameters:
For the above example, after using the asyncPool function, the corresponding execution process is as follows: const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`. By observing the above comments, we can roughly understand the control flow inside the asyncPool function. Next, let's analyze the ES7 implementation of the asyncPool function. 2.2 asyncPool ES7 Implementationasync function asyncPool(poolLimit, array, iteratorFn) { const ret = []; // Store all asynchronous tasks const executing = []; // Store the asynchronous tasks being executed for (const item of array) { // Call iteratorFn function to create asynchronous task const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); // Save new asynchronous tasks // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) { // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); // Save the executing asynchronous task if (executing.length >= poolLimit) { await Promise.race(executing); // Wait for the faster task to complete } } } return Promise.all(ret); } In the above code, the features of Promise.all and Promise.race functions are fully utilized, combined with the async await feature provided in ES7, and finally the concurrency control function is realized. Using the await Promise.race(executing); statement, we will wait for the faster tasks in the list of executing tasks to complete before continuing to the next loop. The asyncPool ES7 implementation is relatively simple. Next, let’s see how to achieve the same functionality without using the async await feature. 2.3 asyncPool ES6 Implementationfunction asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; // Store all asynchronous tasks const executing = []; // Store asynchronous tasks being executed const enqueue = function () { if (i === array.length) { return Promise.resolve(); } const item = array[i++]; // Get a new task item const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); let r = Promise.resolve(); // When the poolLimit value is less than or equal to the total number of tasks, perform concurrency control if (poolLimit <= array.length) { // When the task is completed, remove the completed task from the array of executing tasks const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); } } // After the faster tasks in the executing task list are completed, new tasks will be obtained from the array array return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret)); } In the ES6 implementation, the core control logic is implemented through the internal enqueue function. When the Promise object returned by Promise.race(executing) becomes completed, the enqueue function is called to get the new to-do task from the array. 3. Brother Abao has something to sayIn the ES7 and ES6 implementations of the asyncPool library, we used the Promise.all and Promise.race functions. Among them, handwriting Promise.all is a common interview question. Just taking this opportunity, Brother Abao will write a simple version of Promise.all and Promise.race functions with everyone. 3.1 Handwritten Promise.allThe Promise.all(iterable) method returns a promise object. When the status of all the input promise objects becomes resolved, the returned promise object will return the results of each promise object after resolution in the form of an array. When the status of any input promise object becomes rejected, the returned promise object will reject the corresponding error message. Promise.all = function (iterators) { return new Promise((resolve, reject) => { if (!iterators || iterators.length === 0) { resolve([]); } else { let count = 0; // Counter, used to determine whether all tasks have been completed let result = []; // Result array for (let i = 0; i < iterators.length; i++) { // Considering that iterators[i] may be a normal object, it is packaged as a Promise object Promise.resolve(iterators[i]).then( (data) => { result[i] = data; // Save the corresponding results in order // When all tasks are completed, return the results uniformly if (++count === iterators.length) { resolve(result); } }, (err) => { reject(err); // If any Promise object fails to execute, the reject() method is called return; } ); } } }); }; It should be noted that for the standard implementation of Promise.all, its parameter is an iterable object, such as Array, String or Set. 3.2 Handwritten Promise.raceThe Promise.race(iterable) method returns a promise object. Once a promise object in the iterator is resolved or rejected, the returned promise object will resolve or reject the corresponding value. Promise.race = function (iterators) { return new Promise((resolve, reject) => { for (const iter of iterators) { Promise.resolve(iter) .then((res) => { resolve(res); }) .catch((e) => { reject(e); }); } }); }; In this article, Abao Ge analyzes in detail the specific implementation of async-pool asynchronous task concurrency control, and at the same time enables everyone to better understand the core code of async-pool. Finally, Brother Abao also led everyone to write a simple version of the Promise.all and Promise.race functions. In fact, in addition to the Promise.all function, there is another function - Promise.allSettled, which is used to solve the problems of Promise.all. Interested friends can study it by themselves. IV. Reference Resources Github - async-pool The above is the details of how to implement concurrency control in JavaScript. For more information about JavaScript concurrency control, please pay attention to other related articles on 123WORDPRESS.COM! You may also be interested in:
|
<<: MySQL FAQ series: How to avoid a sudden increase in the size of the ibdata1 file
>>: How to use limit_req_zone in Nginx to limit the access to the same IP
Preface echarts is my most commonly used charting...
Table of contents text LOCK parameter ALGORITHM p...
Detailed explanation of tinyMCE usage initializat...
tomcat official website tomcat is equivalent to a...
When creating a time field DEFAULT CURRENT_TIMEST...
Open the scheduled task editor. Cent uses vim to ...
OBS studio is cool, but JavaScript is cooler. Now...
Tabs: Category + Description Tag bar: Category =&...
Swiper is a sliding special effects plug-in built...
Table of contents MySQL Index Optimization Paging...
Preface I recently encountered a problem at work....
HTML <dl> Tag #Definition and Usage The <...
Preface Share two methods to monitor whether an e...
The recommended code for playing background music ...
Some fault code tables use the following design p...