In order to obtain a batch of independent resources, const ids = [1001, 1002, 1003, 1004, 1005]; const urlPrefix = 'http://opensearch.example.com/api/apps'; // The fetch function sends an HTTP request and returns a Promise const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch); Promise.all(appPromises) // Accumulate through reduce.then(apps => apps.reduce((initial, current) => initial + current.pv, 0)) .catch((error) => console.log(error)); The above code can run normally when there are not many applications. When the number of applications reaches tens of thousands, for systems that do not support concurrent requests very well, your "stress test" will cause the third-party server to crash and temporarily be unable to respond to requests: <html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html> How to solve it? A natural idea is that since so many concurrent requests are not supported, it can be divided into several large blocks, each of which is a The difficulty lies in how to execute Promise serially. Promise only provides parallel ( // task1, task2, task3 are three factory functions that return Promise, simulating our asynchronous request const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log('task1 executed'); }, 1000); }); const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log('task2 executed'); }, 1000); }); const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log('task3 executed'); }, 1000); }); // Aggregation results let result = 0; const resultPromise = [task1, task2, task3].reduce((current, next) => current.then((number) => { console.log('resolved with number', number); // task2, task3's Promise will be resolved here result += number; return next(); }), Promise.resolve(0)) // Aggregate initial value.then(function(last) { console.log('The last promise resolved with number', last); // task3's Promise is resolved here result += last; console.log('all executed with result', result); return Promise.resolve(result); }); The running result is shown in Figure 1: Code analysis: The effect we want is actually The difficulty has been solved. Let's take a look at the final code: /** * Simulate HTTP request * @param {String} url * @return {Promise} */ function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000); }); } const urlPrefix = 'http://opensearch.example.com/api/apps'; const aggregator = { /** * Entry method, start the scheduled task* * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids, 2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); }, /** * Get all application IDs * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001, 1002, 1003, 1004, 1005]); }, promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); }, /** * Get details of all apps * * A concurrent request of `concurrency` applications is called a chunk * The next chunk will continue after the previous chunk is completed concurrently, until all applications have obtained it* * @private * * @param {[Number]} ids * @param {Number} concurrency The number of concurrent requests at a time* @return {[Object]} Information about all applications*/ fetchAppsSerially(ids, concurrency = 100) { // Chunking let chunkOfIds = ids.splice(0, concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0, concurrency); } // Execute in block order const result = []; return tasks.reduce((current, next) => current.then((chunkOfApps) => { console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n'); result.push(...chunkOfApps); // flatten the array return next(); }), Promise.resolve([])) .then((lastchunkOfApps) => { console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n'); result.push(...lastchunkOfApps); // Flatten it again console.info('All chunks has been executed with result', result); return result; }); }, /** * Aggregate PV of all applications * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial); } }; // Start running aggregator.start().then(console.log); The running result is shown in Figure 2: Abstraction and reuseThe purpose has been achieved. Because it is universal, we will start to abstract it into a pattern for reuse. SerialFirst simulate an http get request. /** * mocked http get. * @param {string} url * @returns {{ url: string; delay: number; }} */ function httpGet(url) { const delay = Math.random() * 1000; console.info('GET', url); return new Promise((resolve) => { setTimeout(() => { resolve({ url, delay, at: Date.now() }) }, delay); }) } Execute a batch of requests serially. const ids = [1, 2, 3, 4, 5, 6, 7]; // Batch request function, note that the "function" executed by delay is correct, otherwise the request will be sent out immediately and the serial purpose will not be achieved const httpGetters = ids.map(id => () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`) ); // Serial execution const tasks = await httpGetters.reduce((acc, cur) => { return acc.then(cur); // Shorthand, equivalent to // return acc.then(() => cur()); }, Promise.resolve()); tasks.then(() => { console.log('done'); }); Pay attention to the console output, the following should be output serially: GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 GET https://jsonplaceholder.typicode.com/posts/7 Segment serial, segment parallelHere comes the point. The request scheduler implementation in this article /** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */ function schedulePromises(factories, concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr, size = 1) => { return arr.reduce((acc, cur, idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; }, []) }; const chunks = chunk(factories, concurrency); let resps = []; return chunks.reduce( (acc, cur) => { return acc .then(() => { console.log('---'); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) }, Promise.resolve() ); } Under test, execute the scheduler: // Segmented serial, segmented parallel schedulePromises(httpGetters, 3).then((resps) => { console.log('resps:', resps); }); Console output: --- GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 --- GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 --- GET https://jsonplaceholder.typicode.com/posts/7 resps: [ { "url": "https://jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }, { "url": "https://jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }, { "url": "https://jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }, { "url": "https://jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }, { "url": "https://jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }, { "url": "https://jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }, { "url": "https://jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 } ] Summarize
The above is the details of JS implementation of request scheduler. For more information about JS request scheduler, please pay attention to other related articles on 123WORDPRESS.COM! You may also be interested in:
|
>>: How to Enable or Disable Linux Services Using chkconfig and systemctl Commands
Today I found this prompt when I was running and ...
It is no exaggeration to say that hyperlinks conne...
Table of contents 1. Background 2. Operation step...
disabled definition and usage The disabled attrib...
React is different from Vue. It implements route ...
This article is welcome to be shared and aggregat...
1. Background When the Docker service is started,...
This article shares the specific steps of VMware ...
1. The role of index Generally speaking, an index...
1. Databases and database instances In the study ...
Preface When we build a MySQL cluster, we natural...
Events can specify the execution of SQL code once...
Table of contents Use of Vue mixin Data access in...
Setup is used to write combined APIs. The interna...
This article uses examples to illustrate common b...