Promise 并发下载
Promise.then
.then
的第一个参数是一个函数,该函数将在 promise resolved 后运行并接收结果。
.then
的第二个参数也是一个函数,该函数将在 promise rejected 后运行并接收 error。
let promise = new Promise(function (resolve, reject) {
// 当 promise 被构造完成时,自动执行此函数
// 1 秒后发出工作已经被完成的信号,并带有结果 "done"
setTimeout(() => resolve("done"), 1000);
setTimeout(() => reject("fail"), 500);
});
promise.then(dat => console.log(dat + "data"), err => console.log(err + "error"));
不可控并发下载
function img(text) {
return new Promise((resolve, reject) => {
setTimeout(() => {
console.log(text);
resolve(text);
}, 500);
})
}
(async () => {
let promises = [];
for (let i = 0; i < 5; i++) {
let promise = img(i);
promises.push(promise);
}
let promises2 = [];
for (let i = 5; i < 10; i++) {
let promise = img(i);
promises2.push(promise);
}
// promise 在遍历时已经运行,Promise.all 用来收集结果
Promise.all(promises2).then(data => console.log(data));
Promise.all(promises).then(data => console.log(data));
})()
结果同步的任意并发下载
N个任务并发,同时开始同时结束。
function img(text) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(text);
}, 1000);
})
}
function loadImg(...arg) {
return new Promise((resolve, reject) => {
let arrData = [];
arg.forEach((num) => {
img(num).then(data => {
arrData.push(data);
if (arrData.length == arg.length) {
console.log(arrData);
resolve(arrData);
}
});
})
})
}
(async () => {
let threads = 4;
let results = [];
for (let i = 0; i < 10;) {
let arr = [];
for (let j = 0; j < threads && i < 10; j++) {
arr.push(i++);
}
let promise = await loadImg(...arr);
results.push(...promise);
}
console.log(results);
})()
结果非同步的任意并发下载
- 输入promise数组,使用函数包装promise
- 所有promise进入执行函数, 等待返回promise结果,
- 小于限制数的直接执行, 大于的进入等待队列, 递归调用运行promise
- 监控队列长度和正在运行的promise, 都为0则返回结果, 队列剩余则运行队列头,
- promise参数可以作为其他函数参数传递:
Promise((resolve,reject)=>{(resolve)=>{return resolve}})
promise 递归调用和传参示例:
function all(text) {
return new Promise((resolve) => {
run(text, resolve);
})
}
function run(text, resolve) {
setTimeout(() => {
i++;
console.log(i);
if (i == limit) {
resolve(text);
return;
}
run(text, resolve);
}, 1000)
}
let i = 0;
let limit = 2;
all("hello").then((d) => {
console.log(d);
});
实例代码:
/*
1.输入promise数组,使用函数包装promise
2.所有promise进入执行函数, 等待返回promise结果,
3.小于限制数的直接执行, 大于的进入等待队列,
4.监控队列长度和正在运行的promise, 都为0则返回结果, 队列剩余则运行队列头,
5.promise参数可以作为其他函数参数: Promise((resolve,reject)=>{(resolve)=>{return resolve}})
*/
// 发送 promise
function all(promises) {
return new Promise((resolve, reject) => {
promises.forEach(promise => {
run(promise, resolve, reject);
})
})
}
// 递归运行promise
function run(promise, resolve, reject) {
if (runningNum >= limit) {
console.log("入队:", promise);
queue.push(promise);
return;
}
runningNum++;
promise().then(result => {
runningNum--;
results.push(result);
if (queue.length == 0 && runningNum == 0) {
return resolve(results);
}
if (queue.length) {
run(queue.shift(), resolve, reject);
}
})
}
let promises = []; // 输入
let results = []; // 结果
let limit = 3; // 并发限制
let queue = []; // 等待队列
let runningNum = 0; // 正在运行的
// 输入10个promise
for (let i = 0; i < 10; i++) {
let promise = function () {
return new Promise((resolve) => {
console.log("start", i);
setTimeout(() => {
console.log("end", i);
resolve(i);
}, 1000);
})
};
promises.push(promise);
}
// 发送10个promise
all(promises).then((data) => {
console.log(data);
});
参考
简述一个简易实现思路:
- 封装一个 ConcurrencyPromisePool 类
- 方法有
all()
,和Promise.prototype.all
类似。 - 属性有
limit
、queue
。前者是并发上限,后者存放排队的 promise。
注意:第 2 点中,all 函数传入的是生成 Promise 的方法,而不是 Promise 实例。因为 Promise 一旦生成实例,会直接执行。所以要把这个执行交给 ConcurrencyPromisePool 来控制。
// NodeJS Promise并发控制
// https://xin-tan.com/2020-09-13-bing-fa-kong-zhi/
class ConcurrencyPromisePool {
constructor(limit) {
this.limit = limit;
this.runningNum = 0;
this.queue = [];
this.results = [];
}
all(promises = []) {
return new Promise((resolve, reject) => {
for (const promise of promises) {
// 发送所有 promise
this._run(promise, resolve, reject);
}
});
}
_run(promise, resolve, reject) {
// 超出限制的 promise 入队
if (this.runningNum >= this.limit) {
console.log(">>> 达到上限,入队:", promise);
this.queue.push(promise);
return;
}
// 正在运行的 promise
++this.runningNum;
promise()
.then(res => {
this.results.push(res);
--this.runningNum;
// 运行结束条件:队列长度 && 正在运行的数量
if (this.queue.length === 0 && this.runningNum === 0) {
return resolve(this.results);
}
// 队列还有则,出队,然后递归调用
if (this.queue.length) {
this._run(this.queue.shift(), resolve, reject);
}
})
.catch(reject);
}
}
const promises = [];
for (let i = 0; i < 15; ++i) {
promises.push(() => new Promise(resolve => {
console.log(`${i} start`);
setTimeout(() => {
console.log(`${i} end`);
resolve(i);
}, 1000);
}));
}
const pool = new ConcurrencyPromisePool(2);
pool.all(promises).then(res => {
console.log(res);
});