present (sb for a job etc)
p-map
is an iterative processpromise
And can controlpromise
A library for executing concurrency counts. The authors aresindresorhus
He has also created a number of articles onpromise
librariespromise-fun, for those interested.
beforehand referencesp-limit
is also a library for controlling the number of concurrent requests, and they both serve the same purpose in terms of controlling the number of concurrencies, but thep-map
Adds a new option for requesting (promise
) of iterative processing.
beforehandp-limit
is used as follows, limit accepts a function;
var limit = pLimit(8); // set the maximum number of concurrency to 2
var input = [ // Limit function wraps individual requests
limit(() => fetchSomething('1')), // Limit function wraps each request.
limit(() => fetchSomething('4')))
]; // Execute the request.
// Execute the request
(input).then(res => {
(res)
})
(indicates contrast)p-map
is passed in by the usermapper
A collection processed by a handler function (to be precise, aiterable object (computing));
import pMap from 'p-map';
import got from 'got'.
const sites = [
getWebsiteFromUsername('sindresorhus'), //=> Promise
'',
''
];
const mapper = async site => {
const {requestUrl} = await (site);
return requestUrl.
};
// Takes three arguments, an iterable, a function that processes the iterable, and a configuration option
const result = await pMap(sites, mapper, {concurrency: 2});
(result);
//=> ['/', '/', '/']
The default iterable objects areString、Array、TypedArray、Map、Set 、And to beiterableobject, which must implement the[]()
Methods;
When traversing an iterable object, you are actually traversing it according to the iterator protocol;
For example, iterating over an array looks like this
var iterable = [1,2,3,4]
var iterator = iterable[]() // iterator is an iterator
() // {value: 1, done: false}
() // {value: 2, done: false}
() // {value: 3, done: false}
() // {value: 4, done: false}
() // {value: undefined, done: true}
When the array iteration is complete, it will return the{value: undefined, done: true}
p-pap
The principle of controlling concurrent requests is that the incoming collection is iterated over, and when the first element of the collection (the element may be an asynchronous function) is executed, it is handed over to themapper
function processing.mapper
processed before starting to iterate over the next element, which maintains the iteration in order one by one, at which point the number of concurrencies is1
;
To achieve concurrency isn
and also performs the iteration above, the author cleverly uses thefor
circulate
for(let i=0;i<n;i++)
next()
}
write by handp-map
Here's an implementation of one along the lines of the authorp-map
Now there is an iterable object that needs to be processed.arr
var fetchSomething = (str,ms) =>{
return new Promise((resolve,reject) =>{
setTimeout(() =>{
resolve(parseFloat(str))
},ms)
})
}
var arr= [
fetchSomething('1a' ,1000), // promise
2,3,
fetchSomething( '4a' , 5000), // promise
5,6,7,8,9,10
]
The first and fourth elements of the set are a promise function
p-map
receives three arguments, the object to be iterated over, themapper
Handler function, customized configuration; return value ispromise
As shown below
var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {});
Once you have an iterable object, you recursively iterate over it until the iteration is complete; here you define an internal recursive iterator functionnext
var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
var iterator = iterable[]()
var next= ()=>{
var item=()
if(){
return
}
next()
}
});
Each element in an iterated object is iterated sequentially; if the element is an asynchronous function, you need to wait for the asynchronous function to be cashed out first, and the cashed out value is passed to themapper
function, wait until themapper
The function is either honored or rejected before continuing to iterate over the next element
var iterator = iterable[]()
var next = () => {
var item = ()
if () {
return
}
()
.then(res => mapper(res))
.then(res2 => {
next()
})
}
And each iterationwrap upSave the result of the cashout after
var iterator = iterable[]()
var index = 0 // number, saves the result according to the order of the iterable objects
var ret = []// save the result
var next = () => {
var item = ()
if () {
if () {
}
var currentIndex = index //Store the current element number, used to store the result.
index++ //Next element's index number.
()
.then(res => mapper(res))
.then(res2 => {
ret[currentIndex] = res2
next()
})
}
When the entire iteration is complete and the elements have all been executed (cashed out), output the result set
var pMap = (iterable, mapper, options) => new Promise((resolve,reject) => {
var activeCount = 0 // number of elements being executed
var next = () => {
var item = ()
if () { //elements all iterated over
if (activeCount == 0) {
resolve(ret) //Elements are all executed (honored), output result set
}
return
}
var currentIndex = index // save the current element number, use stem to store the result
index++ // The index of the next element.
activeCount++
()
.then(res => mapper(res))
.then(res2 => {
ret[currentIndex] = res2
activeCount--
next()
})
.catch(err => {
activeCount-- next() }) .catch(err => { activeCount-- activeCount
})
}
})
configuration itemstopOnError
transmitted inwardsp-map
One of the parameters in the configuration item isstopOnError
, which indicates whether or not to terminate the iterative loop when execution encounters an error, so here in the.catch()
Make judgments inside;
()
.then(res = mapper(res))
.then(res2 => {
// ...
})
.catch(err => {
ret[currentIndex] == err // save the result of the error as well
if (stopOnError) {
hasError = true
reject(err) // terminate the loop if an error occurred
}else {
hasError = false
activeCount--
next()
}
}
Ignore error execution resultspMapSkip
mapper
function is user-defined, ifmapper
What to do when there is an execution error and the user expects to ignore the result of the incorrect execution and keep only the correct result? At this point, thepMapSkip
Just make an entrance;
p-map
The source code provides thepMapSkip
,pMap5kip
anSymbol
Value.p-map
The internal processing is then: when the result set receives the result of thepMapSkip
, then it will be cleared after the iteration completes with a return value ofpMapSkip
elements, that is, the elements of themapper
An error occurred during processing, and the user does not want this value.reject(pMapSkip)
For example:
import pMap, { pMapSkip } from 'p-map'
var arr = [
fetchSomething('1a', 1000, true),
2, 3
]
var mapper = (item, index) => {
return new Promise((resolve, reject) => {
return item == 2 ? reject(pMapSkip): resolve(parseFloat(item)) // The elements are 2 ,throw an error
})
}
(async () => {
const result = await pMap(arr, mapper, { concurrency: 2 });
(result); //=>[1,3]
})();
that's why whenmapper
come (or go) backpMapSkip
When you want to mark the corresponding element, you need to mark the
var skipIndexArr= []
Record the position of the element to be culled
var skipIndexArr = [];
()
.then((res = mapper(res)))
.then((res2) => {
// ...
})
.catch((err) => {
if (err === pMapSkip) {
(currentIndex); //Record the position of the element to be culled
} else {
ret[currentIndex] == err;
if (stopOnError) {
hasError = true;
reject(err);
} else {
hasError = false;
activeCount--;
next();
}
}
});
and at the end of the iteration cull the result set that has thepMapSkip
particular element
if () {
if (activeCount == 0) {
for (var k of skipIndexArr) {
(k, 1);
}
resolve(ret);
return;
}
}
In the case of large data, frequent use of thesplice
Performance may not be as good as it could be because the execution ofsplice
After that, the indexes of the subsequent elements are changed; then it has to be remodeled by replacing theskipIndexArr
change intoMap
Form.
// var skipIndexArr= []
var skipIndexArr= new Map()
Record the location of the element to be deleted
if (err === pMapSkip) {
(currentIndex, err);
}
Then the end of the iteration is no longer inside the original arraysplice
, changed to receive with a new array;push
(particle used for comparison and "-er than")splice
Good performance;
if () {
if (activeCount == 0) {
if ( === 0) {
resolve(ret);
return;
}
const pureRet = [];
for (const [index, value] of ()) {
if ((index) === pMapSkip) {
continue;
}
(value);
}
resolve(pureRet);
}
return;
}
Cancel externallyp-map
request or cancel the iteration:AbortController
There exist certain situations when we no longer needp-map
returned, or no longer want to use thep-map
When we do, we need to externally cancel thep-map
request or to cancel the iteration, which can then be done using theAbortController
;
A brief introductionAbortController
usage, there is a request forfetchSomething
var fetchSomething = (str) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(str)
}, 1000)
})
}
Want to cancelfetchSomething
request, it is necessary to pass asignal
Go inside;signal
beAbortController
The instance attributes of theAbortController
and the request is between thesignal
Creating associations;
var controller = new AbortController()
var signal =
var fetchSomething = (str,signal) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(str)
}, 1000)
})
}
fetchSomething('fetch',signal).then(res => {
('res:', res)
}).catch(err => {
('err:', err)
})
After the association is established, the external cancel request uses theAbortController
Instance Methods()
;
()
Then inside the request it listens for external calls to the()
There are two ways to do this.
('abort', () => {
}, false)
or
if () {
}
Full Example:
var controller = new AbortController()
var signal =
var fetchSomething = (str,signal) => {
return new Promise((resolve, reject) => {
('abort', () => {
(' addEventListener')
reject('addEventListenerabolish')
}, false)
setTimeout(() => {
if () {
('aborted')
reject('abortedabolish')
return
}
('go intosetTimeout')
resolve(str)
}, 1000)
})
}
setTimeout(() => {
()
}, 500)
fetchSomething('fetch',signal).then(res => {
('res:', res)
}).catch(err => {
('err:', err)
})
500ms
Post-Output:
addEventListener
err: addEventListener canceled
aborted
combiningp-map
Use the following:
import pMap from 'p-map';
const abortController = new AbortController();
setTimeout(() => {
();
}, 500);
const mapper = async value => value;
await pMap([fetchSomething(1000), fetchSomething(1000)], mapper, {signal: });
// 500 ms close pMap methodologies,throw an error message.
in that casep-map
The internal implementation would be nice to write:
var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
var { signal } = options
if (signal) {
if () {
reject();
}
('abort', () => {
reject();
});
}
var next = () =>{
//...
}
})
Handwritten full source code:
The source code below also puts the() and .catch()
The writing style has been improved toasync await + try catch
Writing style;
let pMapSkip = Symbol('skip')
var pMap = (iterable, mapper, options) => new Promise((resolve, reject) => {
var iterator = iterable[]()
var index = 0 // serial number,Saving results based on the order of iterable objects
var ret = []// Save results
var activeCount = 0 //Number of elements being executed
var isIterableDone = false
var hasError = false
var skipIndexArr = new Map()
var { signal, stopOnError, concurrency } = options
if (signal) {
if () {
reject();
}
('abort', () => {
reject();
});
}
var next = () => {
var item = ()
if () {
isIterableDone = true
if (activeCount == 0) {
if ( === 0) {
resolve(ret);
return;
}
const pureRet = [];
for (const [index, value] of ()) {
if ((index) === pMapSkip) {
continue;
}
(value);
}
resolve(pureRet);
}
return;
}
var currentIndex = index // 保存当前元素serial number,Deposit results dry
index++ //下一个元素的serial number
activeCount++
()
.then(res => mapper(res))
.then(res2 => {
ret[currentIndex] = res2
activeCount--
next()
})
.catch(err => {
ret[currentIndex] == err;
if (stopOnError) {
hasError = true;
reject(err);
} else {
ret[currentIndex] == err;
if (err === pMapSkip) {
(currentIndex, err);
}
hasError = false;
activeCount--;
next();
}
})
}
for (let k = 0; k < concurrency; k++) {
if (isIterableDone) {
break
}
next()
}
})
Test it.
1. TestingpMapSkip
var arr= [
fetchSomething('1a' ,1000), // promise
2,3,
fetchSomething( '4a' , 5000), // promise
5,6,7,8,9,10
]
var mapper = (item, index) => {
return new Promise((resolve, reject) => {
return item == 3 ? reject(pMapSkip): resolve(parseFloat(item))
})
}
pMap(arr, mapper, { concurrency: 2 }).then(res => {
(res) // [1, 2, 4, 5, 6, 7, 8, 9, 10] ,stripped out 3
})
2. Test suspension requests
var controller = new AbortController()
var signal =
pMap(arr, mapper, { concurrency: 2,signal:signal }).then(res => {
(res)
}).catch(err =>{
(err) // 500ms postprint AbortError: signal is aborted without reason
})
setTimeout(() =>{
()
},500)
3. TestingstopOnError
pMap(arr, mapper, { concurrency: 2,signal:signal,stopOnError:true }).then(res => {
(res)
}).catch(err =>{
(err) // Symbol(skip)
})
Up to this point.p-map
The core functionality is implemented; students who are interested can click on it;