p-limit
is a library to control the number of concurrent requests, his overall code is not much, the idea is quite good, very valuable to learn;
give an example
When we initiate multiple requests at the same time, this is generally how we do it
([
requestFn1,
requestFn2,
requestFn3
]).then(res =>{})
or
requestFn1()
requestFn2()
requestFn3()
And using p-limit to limit the number of concurrent requests does that:
var limit = pLimit(8); // Set the maximum number of concurrencies to 8
var input = [ // Limitfunction wraps each request
limit(() => fetchSomething('1')),
limit(() => fetchSomething('2')),
limit(() => fetchSomething('3')),
limit(() => fetchSomething('4')),
limit(() => fetchSomething('5')),
limit(() => fetchSomething('6')),
limit(() => fetchSomething('7')),
limit(() => fetchSomething('8')),
];
// execute a request
(input).then(res =>{
(res)
})
on top ofinput
The array contains the8
classifier for individual things or people, general, catch-all classifierlimit
function, eachlimit
function contains the request to be initiated
When setting the maximum number of concurrent8
upper8
The requests will be executed at the same time
To see the effect, let's assume that the execution time for each request is1s
。
var fetchSomething = (str) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
(str)
resolve(str)
}, 1000)
})
}
When setting the number of concurrent requests to2
hour
When setting the number of concurrent requests to3
hour
p-limit Limiting the number of concurrent requests is, essentially, maintaining a request queue internally;
When a request is initiated, the request is first pushed into the queue to determine whether the number of requests currently being executed is less than the configured number of concurrent requests; if so, the current request is executed; otherwise, wait for whoever among the requests that are being initiated to finish requesting, and then take one out of the first part of the queue for execution;
Source code (v2.3.0)
pLimit
The source code is as follows (this source code isv2.3.0 version, since it was introduced earlier in the project. It will be analyzed later to see how much time is lost from the2.3.0
(Go to the latest version of the source code and see what has been added or improved):
'use strict';
const pTry = require('p-try');
const pLimit = concurrency => {
// Restricted to positive integers
if (!(((concurrency) || concurrency === Infinity) && concurrency > 0)) {
return (new TypeError('Expected `concurrency` to be a number from 1 and up'));
}
const queue = []; // request queue
let activeCount = 0; // Current number of concurrency
const next = () => { // A callback that is executed when a request is completed
activeCount--;
if ( > 0) {
()();
}
};
const run = (fn, resolve, ...args) => { // Request for commencement of execution
activeCount++;
const result = pTry(fn, ...args);
resolve(result); // Pass the results to the generator
(next, next); // Callbacks are invoked after the request is executed
};
// Add request to queue
const enqueue = (fn, resolve, ...args) => {
if (activeCount < concurrency) {
run(fn, resolve, ...args);
} else {
((null, fn, resolve, ...args));
}
};
const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args));
// Exposing internal properties to the outside world
(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () =>
},
clearQueue: {
value: () => {
= 0;
}
}
});
return generator;
};
= pLimit;
= pLimit;
Here's a breakdown
1、pLimit
The function as a whole is a closure function that returns a function namedgenerator
The function of thegenerator
handling concurrency logic.generator
The return value must bepromise
It's the only way to be capture
const generator = (fn,...args) => new Promise((resolve,reject)=7enqueue(fn,resolve,...args))
2. Inenqueue
function inside
// Add request to queue
const enqueue = (fn, resolve, ...args) => {
if (activeCount < concurrency) {
run(fn, resolve, ...args);
} else {
((null, fn, resolve, ...args));
}
};
activeCount
indicates the number of requests being executed whenactiveCount
Less than the configured number of concurrencies (concurrency
), then you can execute the currentfn
(Implementation)run
function), otherwise it is pushed into the request queue and waited for.
3、run
The function receives three formal parameters
const run = (fn, resolve, ...args) => { // Request for commencement of execution
activeCount++;
const result = pTry(fn, ...args);
resolve(result);
(next, next);
};
-
fn
indicates a request for execution. -
resolve
leave it (to sb)generator
definition and passes it down the line, tracking it until the request is executed and calling theresolve(result);
in the name ofgenerator
function (math.)fulfilled
-
···args
denotes the rest of the parameters that will end up asfn
The parameters of the
4. Implementationrun
function time
const run = (fn, resolve, ...args) => { // Request for commencement of execution
activeCount++; // Request for commencement of execution,Number of current requests +1
const result = pTry(fn, ...args);
resolve(result);
(next, next);
};
Execute herefn
Using theconst result = pTry(fn,...args)
, pTry
The effect of this is to create apromise
The result of the package, regardless offn
Is it a synchronous or asynchronous function
// pTry source code (computing)
const pTry = (fn,...args) => new Promise((resolve,reject) => resolve(fn(...args)));
present .fn
Implementation (fn(...args)
Completed and cashed in (resolve(fn(...args))
) Afterresult
It will be honored.
result
After cashing out.generator
(used form a nominal expression)promise
And that's how it's done.resolve(result)
), then the current process of requesting fn is complete.
5. After the current request has been executed, the corresponding number of requests currently in progress should also be reduced by one.activeCount--
const next = () => { // a callback that is executed when a request is completed
activeCount--;;
if ( > 0) {
()();
}
};
It then proceeds to take the request from the head of the queue and execute it
6. Finally expose internal properties to the outside world
(generator, {
activeCount: { // the number of requests currently being made
get: () => activeCount
}, pendingCount: { // The number of requests waiting to be executed.
pendingCount: { // the number of requests waiting to be executed
get: () => activeCount
}, clearQueue: { // The number of requests currently in progress.
clearQueue: {
value: () => {
= 0; }, clearQueue: { value: () => {
}
}
});
Source (v2.3.0) = > Source (v6.1.0)
through (a gap)v2.3.0 up-to-datev6.1.0 Some improvements were added in the middle of the version
1、v3.0.0: always asynchronously executes the function passed to limit
exist3.0.0
In it, the authors put the request to join the team in front of theif
The judgment statement and request execution are placed in a microtask to run; as explained in the source code comments: because when therun
When the function is executed, theactiveCount
is updated asynchronously, so here's theif
The judgment statement should also be executed asynchronously in order to get the real-timeactiveCount
The value of the
This starts with a batch execution oflimit(fn)
When the request is executed, all these requests will be put into the queue first, and then it will be judged whether to execute the request according to the conditions;
2、v3.0.2: Fix an error caused by an invalid concurrency count passed in;
commander-in-chief (military)return
Changed to directthrow
an error
3、v3.1.0: RemovepTry
dependence; improved performance;
removepTry
dependency, changed toasync
The package, as mentioned above.pTry
anpromise
wrapper function that returns the result as apromise
; both are essentially the same;
increasedyocto-queue
Dependence.yocto-queue
is a queue data structure, using a queue instead of an array gives better performance; the time complexity of the queue-in and queue-out operations isO(1)
, and the array'sshift()
beO(n)
;
4、v5.0.0: Fix context propagation issues
Having introducedAsyncResource
export const AsyncResource = {
bind(fn, _type, thisArg) {
return (thisArg);
}
}
Here's an example of what you can do with()
parcels(undefined, fn, resolve, args)
I don't really understand why this layer was added. There are three parameters used here (fn,resolve,args
) are passed in through the function, and thethis
It doesn't matter, does it? Can you guys tell me if you know?
(statistics) correlationissue
existhere are
5、6.0.0: Performance optimization, the main optimizations are below
remove()
, changed to use an immediately executedpromise
and willpromise
(used form a nominal expression)resolve
method is inserted into the queue once theresolve
Completion of the fulfillment, invoking the corresponding request; relatedissue
existhere are
6、v6.1.0: Allow concurrency limits to be modified in real time
Immediately after changing the number of concurrencies it is retested to see if the request can be executed;
ultimate
In the first paragraph above4
Points, No.5
The optimization in the point doesn't make much sense, since the execution of the request uses three parameters (fn,resolve,args
) are passed through the function, it looks like thethis
It doesn't matter. Why multiple layers?bind
What about binding? Can you all please enlighten me if you know.