JS implements request dispatcher

JS implements request dispatcher

Preface: JS naturally supports parallel requests, but at the same time it will bring some problems, such as causing excessive pressure on the target server, so this article introduces a "request scheduler" to control concurrency.

TLDR; Jump directly to the "Abstraction and Reuse" section.

In order to obtain a batch of independent resources, Promise.all(arrayOfPromises) can usually be used for concurrent execution for performance reasons. For example, if we already have 100 application IDs and need to aggregate the PVs of all applications, we usually write:

const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';

// The fetch function sends an HTTP request and returns a Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)
 // Accumulate through reduce.then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
 .catch((error) => console.log(error));

The above code can run normally when there are not many applications. When the number of applications reaches tens of thousands, for systems that do not support concurrent requests very well, your "stress test" will cause the third-party server to crash and temporarily be unable to respond to requests:

<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.1</center>
</body>
</html>

How to solve it?

A natural idea is that since so many concurrent requests are not supported, it can be divided into several large blocks, each of which is a chunk . Requests within chunk are still concurrent, but the chunk size ( chunkSize ) is limited to the maximum number of concurrent requests supported by the system. The next chunk can continue to execute only after the previous chunk is finished, which means that the requests within chunk are concurrent, but the requests between chunk are serial. The idea is actually very simple, but it is difficult to write. To sum up, there are three operations: block, serial, and aggregation

The difficulty lies in how to execute Promise serially. Promise only provides parallel ( Promise.all ) function, and does not provide serial function. We start with three simple requests and see how to implement them and solve the problem heuristically.

// task1, task2, task3 are three factory functions that return Promise, simulating our asynchronous request const task1 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(1);
 console.log('task1 executed');
 }, 1000);
});

const task2 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(2);
 console.log('task2 executed');
 }, 1000);
});

const task3 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(3);
 console.log('task3 executed');
 }, 1000);
});

// Aggregation results let result = 0;

const resultPromise = [task1, task2, task3].reduce((current, next) => 	 
 current.then((number) => {
 console.log('resolved with number', number); // task2, task3's Promise will be resolved here
 result += number;

 return next();
 }),
 
 Promise.resolve(0)) // Aggregate initial value.then(function(last) {
 console.log('The last promise resolved with number', last); // task3's Promise is resolved here

 result += last;

 console.log('all executed with result', result);

 return Promise.resolve(result);
 });

The running result is shown in Figure 1:

Code analysis: The effect we want is actually fn1().then(() => fn2()).then(() => fn3()) . The key to the above code that allows a group of Promise to be executed sequentially is that the reduce "engine" is driving the execution of Promise factory function step by step.

The difficulty has been solved. Let's take a look at the final code:

/**
 * Simulate HTTP request * @param {String} url 
 * @return {Promise}
 */
function fetch(url) {
 console.log(`Fetching ${url}`);
 return new Promise((resolve) => {
 setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
 });
}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {
 /**
 * Entry method, start the scheduled task* 
 * @return {Promise}
 */
 start() {
 return this.fetchAppIds()
 .then(ids => this.fetchAppsSerially(ids, 2))
 .then(apps => this.sumPv(apps))
 .catch(error => console.error(error));
 },
 
 /**
 * Get all application IDs
 *
 * @private
 * 
 * @return {Promise}
 */
 fetchAppIds() {
 return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
 },

 promiseFactory(ids) {
 return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
 },
 
 /**
 * Get details of all apps * 
 * A concurrent request of `concurrency` applications is called a chunk
 * The next chunk will continue after the previous chunk is completed concurrently, until all applications have obtained it*
 * @private
 *
 * @param {[Number]} ids
 * @param {Number} concurrency The number of concurrent requests at a time* @return {[Object]} Information about all applications*/
 fetchAppsSerially(ids, concurrency = 100) {
 // Chunking let chunkOfIds = ids.splice(0, concurrency);
 const tasks = [];
 
 while (chunkOfIds.length !== 0) {
 tasks.push(this.promiseFactory(chunkOfIds));
 chunkOfIds = ids.splice(0, concurrency);
 }
 
 // Execute in block order const result = [];
 return tasks.reduce((current, next) => current.then((chunkOfApps) => {
 console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
 result.push(...chunkOfApps); // flatten the array return next();
 }), Promise.resolve([]))
 .then((lastchunkOfApps) => {
 console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');

 result.push(...lastchunkOfApps); // Flatten it again console.info('All chunks has been executed with result', result);
 return result;
 });
 },
 
 /**
 * Aggregate PV of all applications
 * 
 * @private
 * 
 * @param {[]} apps 
 * @return {[type]} [description]
 */
 sumPv(apps) {
 const initial = { pv: 0 };

 return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
 }
};

// Start running aggregator.start().then(console.log);

The running result is shown in Figure 2:

Abstraction and reuse

The purpose has been achieved. Because it is universal, we will start to abstract it into a pattern for reuse.

Serial

First simulate an http get request.

/**
 * mocked http get.
 * @param {string} url
 * @returns {{ url: string; delay: number; }}
 */
function httpGet(url) {
 const delay = Math.random() * 1000;

 console.info('GET', url);

 return new Promise((resolve) => {
 setTimeout(() => {
 resolve({
 url,
 delay,
 at: Date.now()
 })
 }, delay);
 })
}

Execute a batch of requests serially.

const ids = [1, 2, 3, 4, 5, 6, 7];

// Batch request function, note that the "function" executed by delay is correct, otherwise the request will be sent out immediately and the serial purpose will not be achieved const httpGetters = ids.map(id => 
 () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);

// Serial execution const tasks = await httpGetters.reduce((acc, cur) => {
 return acc.then(cur);
 
 // Shorthand, equivalent to // return acc.then(() => cur());
}, Promise.resolve());

tasks.then(() => {
 console.log('done');
});

Pay attention to the console output, the following should be output serially:

GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.typicode.com/posts/7

Segment serial, segment parallel

Here comes the point. The request scheduler implementation in this article

/**
 * Schedule promises.
 * @param {Array<(...arg: any[]) => Promise<any>>} factories 
 * @param {number} concurrency 
 */
function schedulePromises(factories, concurrency) {
 /**
 * chunk
 * @param {any[]} arr 
 * @param {number} size 
 * @returns {Array<any[]>}
 */
 const chunk = (arr, size = 1) => {
 return arr.reduce((acc, cur, idx) => {
 const modulo = idx % size;

 if (modulo === 0) {
 acc[acc.length] = [cur];
 } else {
 acc[acc.length - 1].push(cur);
 }

 return acc;
 }, [])
 };

 const chunks = chunk(factories, concurrency);

 let resps = [];

 return chunks.reduce(
 (acc, cur) => {
 return acc
 .then(() => {
  console.log('---');
  return Promise.all(cur.map(f => f()));
 })
 .then((intermediateResponses) => {
  resps.push(...intermediateResponses);

  return resps;
 })
 },

 Promise.resolve()
 );
}

Under test, execute the scheduler:

// Segmented serial, segmented parallel schedulePromises(httpGetters, 3).then((resps) => {
 console.log('resps:', resps);
});

Console output:

---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7

resps: [
 {
 "url": "https://jsonplaceholder.typicode.com/posts/1",
 "delay": 733.010980640727,
 "at": 1615131322163
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/2",
 "delay": 594.5056229848931,
 "at": 1615131322024
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/3",
 "delay": 738.8230109146299,
 "at": 1615131322168
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/4",
 "delay": 525.4604386109747,
 "at": 1615131322698
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/5",
 "delay": 29.086379722201183,
 "at": 1615131322201
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/6",
 "delay": 592.2345027398272,
 "at": 1615131322765
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/7",
 "delay": 513.0684467560949,
 "at": 1615131323284
 }
]

Summarize

  1. If the number of concurrent requests is too large, you can consider dividing them into blocks and processing them serially, and then processing the requests concurrently within the blocks.
  2. The problem may seem complicated, but we can simplify it first, then deduce the key points step by step, and finally abstract it to find a solution.
  3. The essence of this article is to use reduce as a serial driving engine, so mastering it can provide new ideas for solving the puzzles we encounter in daily development. For mastering reduce , see the previous article You finally use Reduce 🎉.

The above is the details of JS implementation of request scheduler. For more information about JS request scheduler, please pay attention to other related articles on 123WORDPRESS.COM!

You may also be interested in:
  • js implements axios limit request queue
  • How to use Promise in JavaScript to control the number of concurrent requests
  • Sample code for using js to implement Ajax concurrent requests to limit the number of requests
  • gin Get JSON body of post request
  • PHP implements the conversion of chrome form request data into json data used by the interface
  • Detailed explanation of several solutions for JavaScript interruption requests

<<:  Installation and deployment tutorial of the latest MySQL version 5.7.17 (64bit ZIP green version) under Win 8 or above

>>:  How to Enable or Disable Linux Services Using chkconfig and systemctl Commands

Recommend

Detailed tutorial on VMware installation of Linux CentOS 7.7 system

How to install Linux CentOS 7.7 system in Vmware,...

MySQL data migration using MySQLdump command

The advantages of this solution are simplicity an...

Forever+nginx deployment method example of Node site

I recently bought the cheapest Tencent cloud serv...

How to remotely connect to MySQL database with Navicat Premium

The party that creates a new connection is equiva...

Implementing a simple calculator with javascript

This article example shares the specific code of ...

MySQL json format data query operation

The default table name is base_data and the json ...

MySQL infobright installation steps

Table of contents 1. Use the "rpm -ivh insta...

Optimize MySQL with 3 simple tweaks

I don't expect to be an expert DBA, but when ...

What is a MySQL tablespace?

The topic I want to share with you today is: &quo...

Detailed example of using the distinct method in MySQL

A distinct Meaning: distinct is used to query the...

Use pure CSS to achieve switch effect

First is the idea We use the <input type="...