Location>code7788 >text

Handwritten p-map (library for controlling concurrency and iterating over promises)

Popularity:516 ℃/2024-10-08 22:25:16

present (sb for a job etc)

p-map is an iterative processpromise And can controlpromise A library for executing concurrency counts. The authors aresindresorhusHe has also created a number of articles onpromise librariespromise-fun, for those interested.

beforehand referencesp-limit is also a library for controlling the number of concurrent requests, and they both serve the same purpose in terms of controlling the number of concurrencies, but thep-map Adds a new option for requesting (promise) of iterative processing.

beforehandp-limit is used as follows, limit accepts a function;

var limit = pLimit(8); // set the maximum number of concurrency to 2
var input = [ // Limit function wraps individual requests
    limit(() => fetchSomething('1')), // Limit function wraps each request.
    
    
    limit(() => fetchSomething('4')))
]; // Execute the request.
// Execute the request
(input).then(res => {
    (res)
})

(indicates contrast)p-map is passed in by the usermapper A collection processed by a handler function (to be precise, aiterable object (computing));

import pMap from 'p-map';
import got from 'got'.

const sites = [
getWebsiteFromUsername('sindresorhus'), //=> Promise
'',
''
];

const mapper = async site => {
const {requestUrl} = await (site);
return requestUrl.
};

// Takes three arguments, an iterable, a function that processes the iterable, and a configuration option
const result = await pMap(sites, mapper, {concurrency: 2});

(result);
//=> ['/', '/', '/']

The default iterable objects areStringArrayTypedArrayMapSet 、And to beiterableobject, which must implement the[]() Methods;

When traversing an iterable object, you are actually traversing it according to the iterator protocol;

For example, iterating over an array looks like this

var iterable = [1,2,3,4]
var iterator = iterable[]() // iterator is an iterator
() // {value: 1, done: false}
() // {value: 2, done: false}
() // {value: 3, done: false}
() // {value: 4, done: false}
() // {value: undefined, done: true}

When the array iteration is complete, it will return the{value: undefined, done: true}

p-pap The principle of controlling concurrent requests is that the incoming collection is iterated over, and when the first element of the collection (the element may be an asynchronous function) is executed, it is handed over to themapper function processing.mapper processed before starting to iterate over the next element, which maintains the iteration in order one by one, at which point the number of concurrencies is1

To achieve concurrency isnand also performs the iteration above, the author cleverly uses thefor circulate

for(let i=0;i<n;i++)
    next()
}

write by handp-map

Here's an implementation of one along the lines of the authorp-mapNow there is an iterable object that needs to be processed.arr

var fetchSomething = (str,ms) =>{
    return new Promise((resolve,reject) =>{
        setTimeout(() =>{
            resolve(parseFloat(str))
        },ms)
    })
}

var arr= [
    fetchSomething('1a' ,1000), // promise
    2,3, 
    fetchSomething( '4a' , 5000), // promise
    5,6,7,8,9,10
]

The first and fourth elements of the set are a promise function

p-map receives three arguments, the object to be iterated over, themapper Handler function, customized configuration; return value ispromiseAs shown below

var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {});

Once you have an iterable object, you recursively iterate over it until the iteration is complete; here you define an internal recursive iterator functionnext

var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
    var iterator = iterable[]()

    var next= ()=>{
        var item=()
        if(){
            return
        }
        next()
    }
    
});

Each element in an iterated object is iterated sequentially; if the element is an asynchronous function, you need to wait for the asynchronous function to be cashed out first, and the cashed out value is passed to themapper function, wait until themapper The function is either honored or rejected before continuing to iterate over the next element

var iterator = iterable[]()
var next = () => {
    var item = ()

    if () {
        return
    }

    ()
        .then(res => mapper(res))
        .then(res2 => {
            next()
        })
}

And each iterationwrap upSave the result of the cashout after

var iterator = iterable[]()
var index = 0 // number, saves the result according to the order of the iterable objects
var ret = []// save the result
var next = () => {
    var item = ()

    if () {
        if () {
    }

    var currentIndex = index //Store the current element number, used to store the result.
    index++ //Next element's index number.
    ()
        .then(res => mapper(res))
        .then(res2 => {
            ret[currentIndex] = res2
            next()
        })
}

When the entire iteration is complete and the elements have all been executed (cashed out), output the result set

var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
    var activeCount = 0 // number of elements being executed
    var next = () => {
        var item = ()
        if () { //elements all iterated over
            if (activeCount == 0) {
                resolve(ret) //Elements are all executed (honored), output result set
            }
            return
        }
        var currentIndex = index // save the current element number, use stem to store the result
        index++ // The index of the next element.
        activeCount++
        ()
            .then(res => mapper(res))
            .then(res2 => {
                ret[currentIndex] = res2
                activeCount--
                next()
            })
            .catch(err => {
                activeCount-- next() }) .catch(err => { activeCount-- activeCount
            })
    }
})
configuration itemstopOnError

transmitted inwardsp-map One of the parameters in the configuration item isstopOnError, which indicates whether or not to terminate the iterative loop when execution encounters an error, so here in the.catch() Make judgments inside;

()
    .then(res = mapper(res))
    .then(res2 => {
        // ...
    })
    .catch(err => {
        ret[currentIndex] == err // save the result of the error as well

        if (stopOnError) {
            hasError = true
            reject(err) // terminate the loop if an error occurred
        }else {
            hasError = false
            activeCount--
            next()
        }
    }
Ignore error execution resultspMapSkip

mapper function is user-defined, ifmapper What to do when there is an execution error and the user expects to ignore the result of the incorrect execution and keep only the correct result? At this point, thepMapSkip Just make an entrance;

p-map The source code provides thepMapSkippMap5kip anSymbol Value.p-map The internal processing is then: when the result set receives the result of thepMapSkip, then it will be cleared after the iteration completes with a return value ofpMapSkip elements, that is, the elements of themapper An error occurred during processing, and the user does not want this value.reject(pMapSkip) For example:

import pMap, { pMapSkip } from 'p-map'

var arr = [
    fetchSomething('1a', 1000, true),
    2, 3
]

var mapper = (item, index) => {
    return new Promise((resolve, reject) => {
        return item == 2 ? reject(pMapSkip): resolve(parseFloat(item)) // The elements are 2 ,throw an error
    })
}

(async () => {
    const result = await pMap(arr, mapper, { concurrency: 2 });
    (result); //=>[1,3]
})();

that's why whenmapper come (or go) backpMapSkip When you want to mark the corresponding element, you need to mark the

var skipIndexArr= []

Record the position of the element to be culled

var skipIndexArr = [];

()
  .then((res = mapper(res)))
  .then((res2) => {
    // ...
  })
  .catch((err) => {
    if (err === pMapSkip) {
      (currentIndex); //Record the position of the element to be culled
    } else {
      ret[currentIndex] == err;

      if (stopOnError) {
        hasError = true;
        reject(err);
      } else {
        hasError = false;
        activeCount--;
        next();
      }
    }
  });

and at the end of the iteration cull the result set that has thepMapSkip particular element

if () {
  if (activeCount == 0) {
    for (var k of skipIndexArr) {
      (k, 1);
    }
    resolve(ret);
    return;
  }
}

In the case of large data, frequent use of thesplice Performance may not be as good as it could be because the execution ofsplice After that, the indexes of the subsequent elements are changed; then it has to be remodeled by replacing theskipIndexArr change intoMap Form.

// var skipIndexArr= []
var skipIndexArr= new Map()

Record the location of the element to be deleted

if (err === pMapSkip) {
  (currentIndex, err);
}

Then the end of the iteration is no longer inside the original arraysplice , changed to receive with a new array;push (particle used for comparison and "-er than")splice Good performance;

if () {
  if (activeCount == 0) {
    if ( === 0) {
      resolve(ret);
      return;
    }

    const pureRet = [];

    for (const [index, value] of ()) {
      if ((index) === pMapSkip) {
        continue;
      }

      (value);
    }

    resolve(pureRet);
  }
  return;
}

Cancel externallyp-map request or cancel the iteration:AbortController

There exist certain situations when we no longer needp-map returned, or no longer want to use thep-map When we do, we need to externally cancel thep-map request or to cancel the iteration, which can then be done using theAbortController

A brief introductionAbortController usage, there is a request forfetchSomething

var fetchSomething = (str) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve(str)
        }, 1000)
    })
}

Want to cancelfetchSomething request, it is necessary to pass asignal Go inside;signal beAbortController The instance attributes of theAbortController and the request is between thesignal Creating associations;

var controller = new AbortController()
var signal = 

var fetchSomething = (str,signal) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve(str)
        }, 1000)
    })
}

fetchSomething('fetch',signal).then(res => {
    ('res:', res)
}).catch(err => {
    ('err:', err)
})

After the association is established, the external cancel request uses theAbortController Instance Methods()

()

Then inside the request it listens for external calls to the()There are two ways to do this.

('abort', () => {

}, false)

or

if () {

}

Full Example:

var controller = new AbortController()
var signal =

var fetchSomething = (str,signal) => {
    return new Promise((resolve, reject) => {
        ('abort', () => {
            (' addEventListener')
            reject('addEventListenerabolish')
        }, false)
        setTimeout(() => {
            if () {
                ('aborted')
                reject('abortedabolish')
                return
            }
            ('go intosetTimeout')
            resolve(str)
        }, 1000)
    })
}

setTimeout(() => {
    ()
}, 500)

fetchSomething('fetch',signal).then(res => {
    ('res:', res)
}).catch(err => {
    ('err:', err)
})

500ms Post-Output:

addEventListener
err: addEventListener canceled
aborted

combiningp-map Use the following:

import pMap from 'p-map';

const abortController = new AbortController();

setTimeout(() => {
	();
}, 500);

const mapper = async value => value;

await pMap([fetchSomething(1000), fetchSomething(1000)], mapper, {signal: });
// 500 ms close pMap methodologies,throw an error message.

in that casep-map The internal implementation would be nice to write:

var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
    var { signal } = options

    if (signal) {
        if () {
            reject();
        }

        ('abort', () => {
            reject();
        });
    }

    var next = () =>{
        //...
    }
})
Handwritten full source code:

The source code below also puts the() and .catch() The writing style has been improved toasync await + try catch Writing style;

let pMapSkip = Symbol('skip')

var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
    var iterator = iterable[]()
    var index = 0 // serial number,Saving results based on the order of iterable objects
    var ret = []// Save results
    var activeCount = 0 //Number of elements being executed
    var isIterableDone = false
    var hasError = false
    var skipIndexArr = new Map()
    var { signal, stopOnError, concurrency } = options

    if (signal) {
        if () {
            reject();
        }

        ('abort', () => {
            reject();
        });
    }

    var next = () => {
        var item = ()
        if () {
            isIterableDone = true
            if (activeCount == 0) {
                if ( === 0) {
                    resolve(ret);
                    return;
                }

                const pureRet = [];

                for (const [index, value] of ()) {
                    if ((index) === pMapSkip) {
                        continue;
                    }

                    (value);
                }

                resolve(pureRet);
            }
            return;
        }
        var currentIndex = index // 保存当前元素serial number,Deposit results dry
        index++ //下一个元素的serial number
        activeCount++
        ()
            .then(res => mapper(res))
            .then(res2 => {
                ret[currentIndex] = res2
                activeCount--
                next()
            })
            .catch(err => {
                ret[currentIndex] == err;
                if (stopOnError) {
                    hasError = true;
                    reject(err);
                } else {
                    ret[currentIndex] == err;
                    if (err === pMapSkip) {
                        (currentIndex, err);
                    }
                    hasError = false;
                    activeCount--;
                    next();
                }

            })
    }

    for (let k = 0; k < concurrency; k++) {
        if (isIterableDone) {
            break
        }
        next()
    }
})
Test it.

1. TestingpMapSkip

var arr= [
    fetchSomething('1a' ,1000), // promise
    2,3,
    fetchSomething( '4a' , 5000), // promise
    5,6,7,8,9,10
]
var mapper = (item, index) => {
    return new Promise((resolve, reject) => {
        return item == 3 ? reject(pMapSkip): resolve(parseFloat(item))
    })
}

pMap(arr, mapper, { concurrency: 2 }).then(res => {
    (res) // [1, 2, 4, 5, 6, 7, 8, 9, 10] ,stripped out 3
})

2. Test suspension requests

var controller = new AbortController()
var signal =

pMap(arr, mapper, { concurrency: 2,signal:signal }).then(res => {
    (res)
}).catch(err =>{
    (err) // 500ms postprint AbortError: signal is aborted without reason
})

setTimeout(() =>{
    ()
},500)

3. TestingstopOnError

pMap(arr, mapper, { concurrency: 2,signal:signal,stopOnError:true }).then(res => {
    (res)
}).catch(err =>{
    (err) // Symbol(skip)
})

Up to this point.p-map The core functionality is implemented; students who are interested can click on it;