之前做过一个 Node.js实现分片上传 的功能。当时前端采用文件切片后并发上传,大大提高了上传的速度,使用Promise.race()管理并发池,有效避免浏览器内存耗尽。
现在的问题:Node.js服务端合并大文件分片 内存耗尽导致服务重启
服务端代码
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
// 2. 不会阻塞EventLoop
bufferList.forEach((hash, index) => {
  fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
       ws.write(data);
  });
});服务器配置:RAM:1024MB 1vCPU (运行了一些其他服务)
测试发现只要上传的文件超过300M,在合并分片时就会导致内存耗尽服务重启,根本无法完成分片合并。
解决方案:能不能在循环中控制读取文件的并发数呢?这样就不会有大量文件同时读取到内存中导致服务崩溃了。
尝试像前端那样使用 Promise.race 控制:
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
// Promise.race 并发控制
const pool = [];
let finish = 0; // 已经写入完成的分片数量
// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
  return new Promise((resolve) => {
    fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
      ws.write(data)
      resolve({});
    });
  });
};
(async function easyRead() {
  for (let i = 0; i < bufferList.length; i ++) {
    const task = PR(i).then(val => {
      finish+=1
      const index = pool.findIndex(t => t === task);
      pool.splice(index);
      if (finish === bufferList.length) {
        ws.close();
      }
    });
    pool.push(task);
    if (pool.length === 1) { // 这里并发数量只能是1 否则分片写入是乱序的 格式会被损坏
      await Promise.race(pool);
    }
  }
})()这时神奇的事情就发生了,我们在for循环中使用了Promise.race()控制了同一时间读入到内存中的文件数量。
再次测试,服务已经不会在合并分片时崩溃,即使1个G的文件分片也可以在3秒左右合并完成。
async、await
相信这个是大家更加常用的处理异步的方法,因为它写起来更加优雅。我们上边的Promise.race也可以用它来替换,使代码看起来更加简洁明了:
const ws = fs.createWriteStream(`${target}/${filename}`); // 创建将要写入文件分片的流
const bufferList = fs.readdirSync(`${STATIC_TEMPORARY}/${filename}`); // 读取到分片文件名的列表[1,2,3...] 每个分片1M大小
let finish = 0; // 已经写入完成的分片数量
// 使用Promise包裹读取文件的异步操作
const PR = (index) => {
  return new Promise((resolve) => {
    fs.readFile(`${STATIC_TEMPORARY}/${filename}/${index}`, (err, data) => {
      ws.write(data)
      resolve({});
    });
  });
};
(async function easyRead() {
  for (let i = 0; i < bufferList.length; i ++) {
    await PR(i)
    finish +=1
    if (finish === bufferList.length) {
        ws.close();
    }
  }
})()可以达到相同的效果,看起来是不是清晰了很多?
总结
知道Promise.race()的人很多,这样的面试题也很多,但是能运用到实践中解决实际的问题却不是很多。
希望本文可以帮到你。
文章首发于 IICOOM-技术博客 《使用Promise.race()实现控制并发》
相关文章
暂无评论...
 
                             
                         
                            