JS implements request dispatcher

JS implements request dispatcher

Preface: JS naturally supports parallel requests, but at the same time it will bring some problems, such as causing excessive pressure on the target server, so this article introduces a "request scheduler" to control concurrency.

TLDR; Jump directly to the "Abstraction and Reuse" section.

In order to obtain a batch of independent resources, Promise.all(arrayOfPromises) can usually be used for concurrent execution for performance reasons. For example, if we already have 100 application IDs and need to aggregate the PVs of all applications, we usually write:

const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';

// The fetch function sends an HTTP request and returns a Promise
const appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)
 // Accumulate through reduce.then(apps => apps.reduce((initial, current) => initial + current.pv, 0))
 .catch((error) => console.log(error));

The above code can run normally when there are not many applications. When the number of applications reaches tens of thousands, for systems that do not support concurrent requests very well, your "stress test" will cause the third-party server to crash and temporarily be unable to respond to requests:

<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.1</center>
</body>
</html>

How to solve it?

A natural idea is that since so many concurrent requests are not supported, it can be divided into several large blocks, each of which is a chunk . Requests within chunk are still concurrent, but the chunk size ( chunkSize ) is limited to the maximum number of concurrent requests supported by the system. The next chunk can continue to execute only after the previous chunk is finished, which means that the requests within chunk are concurrent, but the requests between chunk are serial. The idea is actually very simple, but it is difficult to write. To sum up, there are three operations: block, serial, and aggregation

The difficulty lies in how to execute Promise serially. Promise only provides parallel ( Promise.all ) function, and does not provide serial function. We start with three simple requests and see how to implement them and solve the problem heuristically.

// task1, task2, task3 are three factory functions that return Promise, simulating our asynchronous request const task1 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(1);
 console.log('task1 executed');
 }, 1000);
});

const task2 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(2);
 console.log('task2 executed');
 }, 1000);
});

const task3 = () => new Promise((resolve) => {
 setTimeout(() => {
 resolve(3);
 console.log('task3 executed');
 }, 1000);
});

// Aggregation results let result = 0;

const resultPromise = [task1, task2, task3].reduce((current, next) => 	 
 current.then((number) => {
 console.log('resolved with number', number); // task2, task3's Promise will be resolved here
 result += number;

 return next();
 }),
 
 Promise.resolve(0)) // Aggregate initial value.then(function(last) {
 console.log('The last promise resolved with number', last); // task3's Promise is resolved here

 result += last;

 console.log('all executed with result', result);

 return Promise.resolve(result);
 });

The running result is shown in Figure 1:

Code analysis: The effect we want is actually fn1().then(() => fn2()).then(() => fn3()) . The key to the above code that allows a group of Promise to be executed sequentially is that the reduce "engine" is driving the execution of Promise factory function step by step.

The difficulty has been solved. Let's take a look at the final code:

/**
 * Simulate HTTP request * @param {String} url 
 * @return {Promise}
 */
function fetch(url) {
 console.log(`Fetching ${url}`);
 return new Promise((resolve) => {
 setTimeout(() => resolve({ pv: Number(url.match(/\d+$/)) }), 2000);
 });
}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {
 /**
 * Entry method, start the scheduled task* 
 * @return {Promise}
 */
 start() {
 return this.fetchAppIds()
 .then(ids => this.fetchAppsSerially(ids, 2))
 .then(apps => this.sumPv(apps))
 .catch(error => console.error(error));
 },
 
 /**
 * Get all application IDs
 *
 * @private
 * 
 * @return {Promise}
 */
 fetchAppIds() {
 return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
 },

 promiseFactory(ids) {
 return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch));
 },
 
 /**
 * Get details of all apps * 
 * A concurrent request of `concurrency` applications is called a chunk
 * The next chunk will continue after the previous chunk is completed concurrently, until all applications have obtained it*
 * @private
 *
 * @param {[Number]} ids
 * @param {Number} concurrency The number of concurrent requests at a time* @return {[Object]} Information about all applications*/
 fetchAppsSerially(ids, concurrency = 100) {
 // Chunking let chunkOfIds = ids.splice(0, concurrency);
 const tasks = [];
 
 while (chunkOfIds.length !== 0) {
 tasks.push(this.promiseFactory(chunkOfIds));
 chunkOfIds = ids.splice(0, concurrency);
 }
 
 // Execute in block order const result = [];
 return tasks.reduce((current, next) => current.then((chunkOfApps) => {
 console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
 result.push(...chunkOfApps); // flatten the array return next();
 }), Promise.resolve([]))
 .then((lastchunkOfApps) => {
 console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');

 result.push(...lastchunkOfApps); // Flatten it again console.info('All chunks has been executed with result', result);
 return result;
 });
 },
 
 /**
 * Aggregate PV of all applications
 * 
 * @private
 * 
 * @param {[]} apps 
 * @return {[type]} [description]
 */
 sumPv(apps) {
 const initial = { pv: 0 };

 return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial);
 }
};

// Start running aggregator.start().then(console.log);

The running result is shown in Figure 2:

Abstraction and reuse

The purpose has been achieved. Because it is universal, we will start to abstract it into a pattern for reuse.

Serial

First simulate an http get request.

/**
 * mocked http get.
 * @param {string} url
 * @returns {{ url: string; delay: number; }}
 */
function httpGet(url) {
 const delay = Math.random() * 1000;

 console.info('GET', url);

 return new Promise((resolve) => {
 setTimeout(() => {
 resolve({
 url,
 delay,
 at: Date.now()
 })
 }, delay);
 })
}

Execute a batch of requests serially.

const ids = [1, 2, 3, 4, 5, 6, 7];

// Batch request function, note that the "function" executed by delay is correct, otherwise the request will be sent out immediately and the serial purpose will not be achieved const httpGetters = ids.map(id => 
 () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`)
);

// Serial execution const tasks = await httpGetters.reduce((acc, cur) => {
 return acc.then(cur);
 
 // Shorthand, equivalent to // return acc.then(() => cur());
}, Promise.resolve());

tasks.then(() => {
 console.log('done');
});

Pay attention to the console output, the following should be output serially:

GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.typicode.com/posts/7

Segment serial, segment parallel

Here comes the point. The request scheduler implementation in this article

/**
 * Schedule promises.
 * @param {Array<(...arg: any[]) => Promise<any>>} factories 
 * @param {number} concurrency 
 */
function schedulePromises(factories, concurrency) {
 /**
 * chunk
 * @param {any[]} arr 
 * @param {number} size 
 * @returns {Array<any[]>}
 */
 const chunk = (arr, size = 1) => {
 return arr.reduce((acc, cur, idx) => {
 const modulo = idx % size;

 if (modulo === 0) {
 acc[acc.length] = [cur];
 } else {
 acc[acc.length - 1].push(cur);
 }

 return acc;
 }, [])
 };

 const chunks = chunk(factories, concurrency);

 let resps = [];

 return chunks.reduce(
 (acc, cur) => {
 return acc
 .then(() => {
  console.log('---');
  return Promise.all(cur.map(f => f()));
 })
 .then((intermediateResponses) => {
  resps.push(...intermediateResponses);

  return resps;
 })
 },

 Promise.resolve()
 );
}

Under test, execute the scheduler:

// Segmented serial, segmented parallel schedulePromises(httpGetters, 3).then((resps) => {
 console.log('resps:', resps);
});

Console output:

---
GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
---
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
---
GET https://jsonplaceholder.typicode.com/posts/7

resps: [
 {
 "url": "https://jsonplaceholder.typicode.com/posts/1",
 "delay": 733.010980640727,
 "at": 1615131322163
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/2",
 "delay": 594.5056229848931,
 "at": 1615131322024
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/3",
 "delay": 738.8230109146299,
 "at": 1615131322168
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/4",
 "delay": 525.4604386109747,
 "at": 1615131322698
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/5",
 "delay": 29.086379722201183,
 "at": 1615131322201
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/6",
 "delay": 592.2345027398272,
 "at": 1615131322765
 },
 {
 "url": "https://jsonplaceholder.typicode.com/posts/7",
 "delay": 513.0684467560949,
 "at": 1615131323284
 }
]

Summarize

  1. If the number of concurrent requests is too large, you can consider dividing them into blocks and processing them serially, and then processing the requests concurrently within the blocks.
  2. The problem may seem complicated, but we can simplify it first, then deduce the key points step by step, and finally abstract it to find a solution.
  3. The essence of this article is to use reduce as a serial driving engine, so mastering it can provide new ideas for solving the puzzles we encounter in daily development. For mastering reduce , see the previous article You finally use Reduce 🎉.

The above is the details of JS implementation of request scheduler. For more information about JS request scheduler, please pay attention to other related articles on 123WORDPRESS.COM!

You may also be interested in:
  • js implements axios limit request queue
  • How to use Promise in JavaScript to control the number of concurrent requests
  • Sample code for using js to implement Ajax concurrent requests to limit the number of requests
  • gin Get JSON body of post request
  • PHP implements the conversion of chrome form request data into json data used by the interface
  • Detailed explanation of several solutions for JavaScript interruption requests

<<:  Installation and deployment tutorial of the latest MySQL version 5.7.17 (64bit ZIP green version) under Win 8 or above

>>:  How to Enable or Disable Linux Services Using chkconfig and systemctl Commands

Recommend

Solution to MySQL Installer is running in Community mode

Today I found this prompt when I was running and ...

XHTML Getting Started Tutorial: XHTML Hyperlinks

It is no exaggeration to say that hyperlinks conne...

Project practice of deploying Docker containers using Portainer

Table of contents 1. Background 2. Operation step...

Specific usage of textarea's disabled and readonly attributes

disabled definition and usage The disabled attrib...

Implementation of react routing guard (routing interception)

React is different from Vue. It implements route ...

Significantly optimize the size of PNG images with CSS mask (recommended)

This article is welcome to be shared and aggregat...

How to modify the default network segment of Docker0 bridge in Docker

1. Background When the Docker service is started,...

VMware virtual machine installation Linux system graphic tutorial

This article shares the specific steps of VMware ...

Optimizing query speed of MySQL with tens of millions of data using indexes

1. The role of index Generally speaking, an index...

Summary of MySQL Architecture Knowledge Points

1. Databases and database instances In the study ...

Detailed explanation of server-id example in MySQL master-slave synchronization

Preface When we build a MySQL cluster, we natural...

MySQL uses events to complete scheduled tasks

Events can specify the execution of SQL code once...

Detailed explanation of the use of Vue mixin

Table of contents Use of Vue mixin Data access in...

setup+ref+reactive implements vue3 responsiveness

Setup is used to write combined APIs. The interna...

Linux common basic commands and usage

This article uses examples to illustrate common b...