Location>code7788 >text

Controlling the number of concurrent requests: p-limit source code interpretation

Popularity:336 ℃/2024-09-22 23:18:56

p-limit is a library to control the number of concurrent requests, his overall code is not much, the idea is quite good, very valuable to learn;

give an example

When we initiate multiple requests at the same time, this is generally how we do it

([
    requestFn1,
    requestFn2,
    requestFn3
]).then(res =>{})

or

requestFn1()
requestFn2()
requestFn3()

And using p-limit to limit the number of concurrent requests does that:

var limit = pLimit(8); // Set the maximum number of concurrencies to 8

var input = [ // Limitfunction wraps each request
    limit(() => fetchSomething('1')),
    limit(() => fetchSomething('2')),
    limit(() => fetchSomething('3')),
    limit(() => fetchSomething('4')),
    limit(() => fetchSomething('5')),
    limit(() => fetchSomething('6')),
    limit(() => fetchSomething('7')),
    limit(() => fetchSomething('8')),
];

// execute a request
(input).then(res =>{
    (res)
})

on top ofinput The array contains the8 classifier for individual things or people, general, catch-all classifierlimit function, eachlimit function contains the request to be initiated

When setting the maximum number of concurrent8 upper8 The requests will be executed at the same time

To see the effect, let's assume that the execution time for each request is1s

var fetchSomething = (str) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            (str)
            resolve(str)
        }, 1000)
    })
}

When setting the number of concurrent requests to2 hour

image

When setting the number of concurrent requests to3 hour

image

p-limit Limiting the number of concurrent requests is, essentially, maintaining a request queue internally;

When a request is initiated, the request is first pushed into the queue to determine whether the number of requests currently being executed is less than the configured number of concurrent requests; if so, the current request is executed; otherwise, wait for whoever among the requests that are being initiated to finish requesting, and then take one out of the first part of the queue for execution;

Source code (v2.3.0)

pLimit The source code is as follows (this source code isv2.3.0 version, since it was introduced earlier in the project. It will be analyzed later to see how much time is lost from the2.3.0 (Go to the latest version of the source code and see what has been added or improved):

'use strict';
const pTry = require('p-try');

const pLimit = concurrency => {
    // Restricted to positive integers
    if (!(((concurrency) || concurrency === Infinity) && concurrency > 0)) {
        return (new TypeError('Expected `concurrency` to be a number from 1 and up'));
    }

    const queue = []; // request queue
    let activeCount = 0; // Current number of concurrency

    const next = () => { // A callback that is executed when a request is completed
        activeCount--;

        if ( > 0) {
            ()();
        }
    };

    const run = (fn, resolve, ...args) => { // Request for commencement of execution
        activeCount++;

        const result = pTry(fn, ...args);

        resolve(result); // Pass the results to the generator

        (next, next); // Callbacks are invoked after the request is executed
    };

    // Add request to queue
    const enqueue = (fn, resolve, ...args) => {
        if (activeCount < concurrency) {
            run(fn, resolve, ...args);
        } else {
            ((null, fn, resolve, ...args));
        }
    };

    const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args));
    
    // Exposing internal properties to the outside world
    (generator, {
        activeCount: {
            get: () => activeCount
        },
        pendingCount: {
            get: () =>
        },
        clearQueue: {
            value: () => {
                 = 0;
            }
        }
    });

    return generator;
};

 = pLimit;
 = pLimit;

Here's a breakdown

1、pLimit The function as a whole is a closure function that returns a function namedgenerator The function of thegenerator handling concurrency logic.
generator The return value must bepromiseIt's the only way to be capture

const generator = (fn,...args) => new Promise((resolve,reject)=7enqueue(fn,resolve,...args))

2. Inenqueue function inside

// Add request to queue
const enqueue = (fn, resolve, ...args) => {
    if (activeCount < concurrency) {
        run(fn, resolve, ...args);
    } else {
        ((null, fn, resolve, ...args));
    }
};

activeCount indicates the number of requests being executed whenactiveCount Less than the configured number of concurrencies (concurrency), then you can execute the currentfn(Implementation)run function), otherwise it is pushed into the request queue and waited for.

3、run The function receives three formal parameters

const run = (fn, resolve, ...args) => { // Request for commencement of execution
    activeCount++;
    const result = pTry(fn, ...args);
    resolve(result);
    (next, next);
};
  • fn indicates a request for execution.

  • resolve leave it (to sb)generator definition and passes it down the line, tracking it until the request is executed and calling theresolve(result); in the name ofgenerator function (math.)fulfilled

  • ···args denotes the rest of the parameters that will end up asfn The parameters of the

4. Implementationrun function time

const run = (fn, resolve, ...args) => { // Request for commencement of execution
    activeCount++; // Request for commencement of execution,Number of current requests +1

    const result = pTry(fn, ...args);

    resolve(result);

    (next, next);
};

Execute herefn Using theconst result = pTry(fn,...args)pTry The effect of this is to create apromise The result of the package, regardless offn Is it a synchronous or asynchronous function

// pTry source code (computing)
const pTry = (fn,...args) => new Promise((resolve,reject) => resolve(fn(...args)));

present .fn Implementation (fn(...args)Completed and cashed in (resolve(fn(...args))) Afterresult It will be honored.

result After cashing out.generator (used form a nominal expression)promise And that's how it's done.resolve(result) ), then the current process of requesting fn is complete.

5. After the current request has been executed, the corresponding number of requests currently in progress should also be reduced by one.activeCount--

const next = () => { // a callback that is executed when a request is completed
    activeCount--;;

    if ( > 0) {
        ()();
    }
};

It then proceeds to take the request from the head of the queue and execute it

6. Finally expose internal properties to the outside world

(generator, {
    activeCount: { // the number of requests currently being made
        get: () => activeCount
    }, pendingCount: { // The number of requests waiting to be executed.
    pendingCount: { // the number of requests waiting to be executed
        get: () => activeCount
    }, clearQueue: { // The number of requests currently in progress.
    clearQueue: {
        value: () => {
             = 0; }, clearQueue: { value: () => {
        }
    }
});

Source (v2.3.0) = > Source (v6.1.0)

through (a gap)v2.3.0 up-to-datev6.1.0 Some improvements were added in the middle of the version

1、v3.0.0: always asynchronously executes the function passed to limit

image

exist3.0.0 In it, the authors put the request to join the team in front of theif The judgment statement and request execution are placed in a microtask to run; as explained in the source code comments: because when therun When the function is executed, theactiveCount is updated asynchronously, so here's theif The judgment statement should also be executed asynchronously in order to get the real-timeactiveCount The value of the

This starts with a batch execution oflimit(fn) When the request is executed, all these requests will be put into the queue first, and then it will be judged whether to execute the request according to the conditions;

2、v3.0.2: Fix an error caused by an invalid concurrency count passed in;

image

commander-in-chief (military)return Changed to directthrow an error

3、v3.1.0: RemovepTry dependence; improved performance;

image

removepTry dependency, changed toasync The package, as mentioned above.pTry anpromise wrapper function that returns the result as apromise; both are essentially the same;

increasedyocto-queue Dependence.yocto-queueis a queue data structure, using a queue instead of an array gives better performance; the time complexity of the queue-in and queue-out operations isO(1), and the array'sshift() beO(n);

4、v5.0.0: Fix context propagation issues

image

Having introducedAsyncResource

export const AsyncResource = {
    bind(fn, _type, thisArg) {
        return (thisArg);
    }
}

Here's an example of what you can do with() parcels(undefined, fn, resolve, args) I don't really understand why this layer was added. There are three parameters used here (fn,resolve,args) are passed in through the function, and thethis It doesn't matter, does it? Can you guys tell me if you know?

(statistics) correlationissue existhere are

5、6.0.0: Performance optimization, the main optimizations are below

image

remove(), changed to use an immediately executedpromiseand willpromise (used form a nominal expression)resolve method is inserted into the queue once theresolve Completion of the fulfillment, invoking the corresponding request; relatedissue existhere are

6、v6.1.0: Allow concurrency limits to be modified in real time

image

Immediately after changing the number of concurrencies it is retested to see if the request can be executed;


ultimate

In the first paragraph above4Points, No.5The optimization in the point doesn't make much sense, since the execution of the request uses three parameters (fn,resolve,args) are passed through the function, it looks like thethis It doesn't matter. Why multiple layers?bind What about binding? Can you all please enlighten me if you know.