In order to obtain a batch of independent resources, const ids = [1001, 1002, 1003, 1004, 1005]; const urlPrefix = 'http://opensearch.example.com/api/apps'; // The fetch function sends an HTTP request and returns a Promise const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch); Promise.all(appPromises) // Accumulate through reduce.then(apps => apps.reduce((initial, current) => initial + current.pv, 0)) .catch((error) => console.log(error)); The above code can run normally when there are not many applications. When the number of applications reaches tens of thousands, for systems that do not support concurrent requests very well, your "stress test" will cause the third-party server to crash and temporarily be unable to respond to requests: <html> <head><title>502 Bad Gateway</title></head> <body bgcolor="white"> <center><h1>502 Bad Gateway</h1></center> <hr><center>nginx/1.10.1</center> </body> </html> How to solve it? A natural idea is that since so many concurrent requests are not supported, it can be divided into several large blocks, each of which is a The difficulty lies in how to execute Promise serially. Promise only provides parallel ( // task1, task2, task3 are three factory functions that return Promise, simulating our asynchronous request const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log('task1 executed'); }, 1000); }); const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log('task2 executed'); }, 1000); }); const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log('task3 executed'); }, 1000); }); // Aggregation results let result = 0; const resultPromise = [task1, task2, task3].reduce((current, next) => current.then((number) => { console.log('resolved with number', number); // task2, task3's Promise will be resolved here result += number; return next(); }), Promise.resolve(0)) // Aggregate initial value.then(function(last) { console.log('The last promise resolved with number', last); // task3's Promise is resolved here result += last; console.log('all executed with result', result); return Promise.resolve(result); }); The running result is shown in Figure 1: Code analysis: The effect we want is actually The difficulty has been solved. Let's take a look at the final code: /** * Simulate HTTP request * @param {String} url * @return {Promise} */ function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000); }); } const urlPrefix = 'http://opensearch.example.com/api/apps'; const aggregator = { /** * Entry method, start the scheduled task* * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids, 2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); }, /** * Get all application IDs * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001, 1002, 1003, 1004, 1005]); }, promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); }, /** * Get details of all apps * * A concurrent request of `concurrency` applications is called a chunk * The next chunk will continue after the previous chunk is completed concurrently, until all applications have obtained it* * @private * * @param {[Number]} ids * @param {Number} concurrency The number of concurrent requests at a time* @return {[Object]} Information about all applications*/ fetchAppsSerially(ids, concurrency = 100) { // Chunking let chunkOfIds = ids.splice(0, concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0, concurrency); } // Execute in block order const result = []; return tasks.reduce((current, next) => current.then((chunkOfApps) => { console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n'); result.push(...chunkOfApps); // flatten the array return next(); }), Promise.resolve([])) .then((lastchunkOfApps) => { console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n'); result.push(...lastchunkOfApps); // Flatten it again console.info('All chunks has been executed with result', result); return result; }); }, /** * Aggregate PV of all applications * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial); } }; // Start running aggregator.start().then(console.log); The running result is shown in Figure 2: Abstraction and reuseThe purpose has been achieved. Because it is universal, we will start to abstract it into a pattern for reuse. SerialFirst simulate an http get request. /** * mocked http get. * @param {string} url * @returns {{ url: string; delay: number; }} */ function httpGet(url) { const delay = Math.random() * 1000; console.info('GET', url); return new Promise((resolve) => { setTimeout(() => { resolve({ url, delay, at: Date.now() }) }, delay); }) } Execute a batch of requests serially. const ids = [1, 2, 3, 4, 5, 6, 7]; // Batch request function, note that the "function" executed by delay is correct, otherwise the request will be sent out immediately and the serial purpose will not be achieved const httpGetters = ids.map(id => () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`) ); // Serial execution const tasks = await httpGetters.reduce((acc, cur) => { return acc.then(cur); // Shorthand, equivalent to // return acc.then(() => cur()); }, Promise.resolve()); tasks.then(() => { console.log('done'); }); Pay attention to the console output, the following should be output serially: GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 GET https://jsonplaceholder.typicode.com/posts/7 Segment serial, segment parallelHere comes the point. The request scheduler implementation in this article /** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */ function schedulePromises(factories, concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr, size = 1) => { return arr.reduce((acc, cur, idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; }, []) }; const chunks = chunk(factories, concurrency); let resps = []; return chunks.reduce( (acc, cur) => { return acc .then(() => { console.log('---'); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) }, Promise.resolve() ); } Under test, execute the scheduler: // Segmented serial, segmented parallel schedulePromises(httpGetters, 3).then((resps) => { console.log('resps:', resps); }); Console output: --- GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 --- GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 --- GET https://jsonplaceholder.typicode.com/posts/7 resps: [ { "url": "https://jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }, { "url": "https://jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }, { "url": "https://jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }, { "url": "https://jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }, { "url": "https://jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }, { "url": "https://jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }, { "url": "https://jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 } ] Summarize
The above is the details of JS implementation of request scheduler. For more information about JS request scheduler, please pay attention to other related articles on 123WORDPRESS.COM! You may also be interested in:
|
>>: How to Enable or Disable Linux Services Using chkconfig and systemctl Commands
Open any web page: for example, http://www.baidu....
How to install Linux CentOS 7.7 system in Vmware,...
The advantages of this solution are simplicity an...
This article uses examples to illustrate the sear...
I recently bought the cheapest Tencent cloud serv...
The party that creates a new connection is equiva...
1. CDN It is the most commonly used acceleration ...
This article example shares the specific code of ...
The default table name is base_data and the json ...
Table of contents 1. Use the "rpm -ivh insta...
I don't expect to be an expert DBA, but when ...
mysql-8.0.19-winx64 downloaded from the official ...
The topic I want to share with you today is: &quo...
A distinct Meaning: distinct is used to query the...
First is the idea We use the <input type="...