JavaScript Promise 并发控制下载文件的实现


Promise 并发下载

Promise.then

.then 的第一个参数是一个函数,该函数将在 promise resolved 后运行并接收结果。

.then 的第二个参数也是一个函数,该函数将在 promise rejected 后运行并接收 error。

let promise = new Promise(function (resolve, reject) {
  // 当 promise 被构造完成时,自动执行此函数

  // 1 秒后发出工作已经被完成的信号,并带有结果 "done"
  setTimeout(() => resolve("done"), 1000);
  setTimeout(() => reject("fail"), 500);
});

promise.then(dat => console.log(dat + "data"), err => console.log(err + "error"));

不可控并发下载

function img(text) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      console.log(text);
      resolve(text);
    }, 500);
  })
}

(async () => {
  let promises = [];
  for (let i = 0; i < 5; i++) {
    let promise = img(i);
    promises.push(promise);
  }
  let promises2 = [];
  for (let i = 5; i < 10; i++) {
    let promise = img(i);
    promises2.push(promise);
  }
  // promise 在遍历时已经运行,Promise.all 用来收集结果
  Promise.all(promises2).then(data => console.log(data));
  
  Promise.all(promises).then(data => console.log(data));
})()

结果同步的任意并发下载

N个任务并发,同时开始同时结束。

function img(text) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(text);
    }, 1000);
  })
}

function loadImg(...arg) {
  return new Promise((resolve, reject) => {
    let arrData = [];
    arg.forEach((num) => {
      img(num).then(data => {
        arrData.push(data);
        if (arrData.length == arg.length) {
          console.log(arrData);
          resolve(arrData);
        }
      });
    })
  })
}
(async () => {
  let threads = 4;
  let results = [];

  for (let i = 0; i < 10;) {
    let arr = [];
    for (let j = 0; j < threads && i < 10; j++) {
      arr.push(i++);
    }
    let promise = await loadImg(...arr);
    results.push(...promise);
  }

  console.log(results);
})()

结果非同步的任意并发下载

  1. 输入promise数组,使用函数包装promise
  2. 所有promise进入执行函数, 等待返回promise结果,
  3. 小于限制数的直接执行, 大于的进入等待队列, 递归调用运行promise
  4. 监控队列长度和正在运行的promise, 都为0则返回结果, 队列剩余则运行队列头,
  5. promise参数可以作为其他函数参数传递: Promise((resolve,reject)=>{(resolve)=>{return resolve}})

promise 递归调用和传参示例:

function all(text) {
  return new Promise((resolve) => {
    run(text, resolve);
  })
}

function run(text, resolve) {
  setTimeout(() => {
    i++;
    console.log(i);
    if (i == limit) {
      resolve(text);
      return;
    }
    run(text, resolve);
  }, 1000)
}

let i = 0;
let limit = 2;

all("hello").then((d) => {
  console.log(d);
});

实例代码:

/* 
  1.输入promise数组,使用函数包装promise
  2.所有promise进入执行函数, 等待返回promise结果, 
  3.小于限制数的直接执行, 大于的进入等待队列,
  4.监控队列长度和正在运行的promise, 都为0则返回结果, 队列剩余则运行队列头,
  5.promise参数可以作为其他函数参数: Promise((resolve,reject)=>{(resolve)=>{return resolve}})
*/

// 发送 promise
function all(promises) {
  return new Promise((resolve, reject) => {
    promises.forEach(promise => {
      run(promise, resolve, reject);
    })
  })
}

// 递归运行promise
function run(promise, resolve, reject) {
  if (runningNum >= limit) {
    console.log("入队:", promise);
    queue.push(promise);
    return;
  }
  runningNum++;
  promise().then(result => {
    runningNum--;
    results.push(result);
    if (queue.length == 0 && runningNum == 0) {
      return resolve(results);
    }
    if (queue.length) {
      run(queue.shift(), resolve, reject);
    }
  })
}

let promises = []; // 输入
let results = []; // 结果
let limit = 3; // 并发限制
let queue = []; // 等待队列
let runningNum = 0; // 正在运行的

// 输入10个promise
for (let i = 0; i < 10; i++) {
  let promise = function () {
    return new Promise((resolve) => {
      console.log("start", i);
      setTimeout(() => {
        console.log("end", i);
        resolve(i);
      }, 1000);
    })
  };
  promises.push(promise);
}

// 发送10个promise
all(promises).then((data) => {
  console.log(data);
});

参考

简述一个简易实现思路:

  1. 封装一个 ConcurrencyPromisePool 类
  2. 方法有all(),和Promise.prototype.all类似。
  3. 属性有 limitqueue。前者是并发上限,后者存放排队的 promise。

注意:第 2 点中,all 函数传入的是生成 Promise 的方法,而不是 Promise 实例。因为 Promise 一旦生成实例,会直接执行。所以要把这个执行交给 ConcurrencyPromisePool 来控制。

// NodeJS Promise并发控制 
// https://xin-tan.com/2020-09-13-bing-fa-kong-zhi/

class ConcurrencyPromisePool {
  constructor(limit) {
    this.limit = limit;
    this.runningNum = 0;
    this.queue = [];
    this.results = [];
  }

  all(promises = []) {
    return new Promise((resolve, reject) => {
      for (const promise of promises) {
        // 发送所有 promise
        this._run(promise, resolve, reject);
      }
    });
  }

  _run(promise, resolve, reject) {
    // 超出限制的 promise 入队
    if (this.runningNum >= this.limit) {
      console.log(">>> 达到上限,入队:", promise);
      this.queue.push(promise);
      return;
    }
    // 正在运行的 promise
    ++this.runningNum;
    promise()
      .then(res => {
        this.results.push(res);
        --this.runningNum;

        // 运行结束条件:队列长度 && 正在运行的数量
        if (this.queue.length === 0 && this.runningNum === 0) {
          return resolve(this.results);
        }
        // 队列还有则,出队,然后递归调用
        if (this.queue.length) {
          this._run(this.queue.shift(), resolve, reject);
        }
      })
      .catch(reject);
  }
}

const promises = [];
for (let i = 0; i < 15; ++i) {
  promises.push(() => new Promise(resolve => {
    console.log(`${i} start`);
    setTimeout(() => {
      console.log(`${i} end`);
      resolve(i);
    }, 1000);
  }));
}

const pool = new ConcurrencyPromisePool(2);
pool.all(promises).then(res => {
  console.log(res);
});

文章作者: iKnow
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 iKnow !
  目录