文件收集和聚合结果的操作报告与非阻塞IO

我想对任意大的文件集执行一些任意昂贵的工作。我想实时报告进度,然后在处理完所有文件后显示结果。如果没有与我的表达式匹配的文件,我想抛出一个错误。 想象一下,编写一个测试框架,它可以加载所有测试文件,执行它们(没有特定的顺序),实时报告进度,然后在所有测试完成后显示聚合结果。 用阻塞语言(比如Ruby)编写这段代码非常简单。 事实证明,我在节点中执行这个看似简单的任务时遇到了麻烦,同时也真正利用了基于事件的异步IO。 我的第一个设计是连续执行每个步骤。 加载所有文件,创建要处理的文件集合 处理集合中的每个文件 处理完所有文件后报告结果 这种方法确实有效,但对我来说似乎并不合适,因为它会导致程序中计算成本更高的部分等待所有文件IO完成。这不是Node设计要避免的那种等待吗? 我的第二个设计是处理每个文件,因为它是在磁盘上异步找到的。为了论证,让我们想象一个看起来像这样的方法:
eachFileMatching(path, expression, callback) {
  // recursively, asynchronously traverse the file system,
  // calling callback every time a file name matches expression.
}
这个方法的消费者看起来像这样:
eachFileMatching('test/', /_test.js/, function(err, testFile) {
  // read and process the content of testFile
});
虽然这种设计感觉像是一种非常“节点”的IO工作方式,但它遇到了两个主要问题(至少在我的错误实现中): 我不知道何时处理了所有文件,因此我不知道何时汇编和发布结果。 因为文件读取是非阻塞的,并且是递归的,所以我很难知道如何找不到文件。 我希望我只是做错了,并且有一些相当简单的策略,其他人使用它来使第二种方法起作用。 尽管这个例子使用了一个测试框架,但我还有其他各种项目可以解决这个完全相同的问题,我想任何人都可以编写一个相当复杂的应用程序来访问节点中的文件系统。     
已邀请:
“阅读和处理testFile的内容”是什么意思? 我不明白为什么你不知道何时处理所有文件。你不是在使用Streams吗?一个流有几个事件,而不仅仅是
data
。如果您处理
end
事件,那么您将知道每个文件何时完成。 例如,您可能有一个
list
的文件名,为每个文件设置处理,然后当您收到
end
事件时,从列表中删除文件名。当列表为空时,您就完成了。或者创建一个包含名称和完成状态的FileName对象。当您收到
end
事件时,请更改状态并减少文件名计数器。当计数器变为零时,您就完成了,或者如果您不相信可以扫描所有FileName对象以确保其状态已完成。 您可能还有一个定期检查计数器的计时器,如果它在一段时间内没有更改,则报告处理可能卡在状态未完成的FileName对象上。 ...我刚刚在另一个问题中遇到了这个场景,并且接受的答案(加上github链接)解释得很好。检查事件驱动代码的循环?     
事实证明,我能够构建的最小的工作解决方案比我希望的要复杂得多。 以下是适合我的代码。它可以被清理或者在这里和那里稍微更具可读性,我对这样的反馈不感兴趣。 如果有一个明显不同的方法来解决这个问题,那就更简单和/或更有效率,我很有兴趣听到它。令我惊讶的是,这个看似简单的要求的解决方案需要如此大量的代码,但也许这就是为什么有人发明阻止io的原因? 复杂性实际上是满足以下所有要求的愿望: 处理找到的文件 知道搜索何时完成 知道是否找不到文件 这是代码:
/**
 * Call fileHandler with the file name and file Stat for each file found inside
 * of the provided directory.
 *
 * Call the optionally provided completeHandler with an array of files (mingled
 * with directories) and an array of Stat objects (one for each of the found
 * files.
 *
 * Following is an example of a simple usage:
 *
 *   eachFileOrDirectory('test/', function(err, file, stat) {
 *     if (err) throw err;
 *     if (!stat.isDirectory()) {
 *       console.log(">> Found file: " + file);
 *     }
 *   });
 *
 * Following is an example that waits for all files and directories to be 
 * scanned and then uses the entire result to do something:
 *
 *   eachFileOrDirectory('test/', null, function(files, stats) {
 *     if (err) throw err;
 *     var len = files.length;
 *     for (var i = 0; i < len; i++) {
 *       if (!stats[i].isDirectory()) {
 *         console.log(">> Found file: " + files[i]);
 *       }
 *     }
 *   });
 */
var eachFileOrDirectory = function(directory, fileHandler, completeHandler) {
  var filesToCheck = 0;
  var checkedFiles = [];
  var checkedStats = [];

  directory = (directory) ? directory : './';

  var fullFilePath = function(dir, file) {
    return dir.replace(//$/, '') + '/' + file;
  };

  var checkComplete = function() {
    if (filesToCheck == 0 && completeHandler) {
      completeHandler(null, checkedFiles, checkedStats);
    }
  };

  var onFileOrDirectory = function(fileOrDirectory) {
    filesToCheck++;
    fs.stat(fileOrDirectory, function(err, stat) {
      filesToCheck--;
      if (err) return fileHandler(err);
      checkedFiles.push(fileOrDirectory);
      checkedStats.push(stat);
      fileHandler(null, fileOrDirectory, stat);
      if (stat.isDirectory()) {
        onDirectory(fileOrDirectory);
      }
      checkComplete();
    });
  };

  var onDirectory = function(dir) {
    filesToCheck++;
    fs.readdir(dir, function(err, files) {
      filesToCheck--;
      if (err) return fileHandler(err);
      files.forEach(function(file, index) {
        file = fullFilePath(dir, file);
        onFileOrDirectory(file);
      });
      checkComplete();
    });
  }

  onFileOrDirectory(directory);
};
    

bab

这样做的两种方式,首先,可能是连续考虑会有类似的
var files = [];
doFile(files, oncomplete);

function doFile(files, oncomplete) {
  if (files.length === 0) return oncomplete();
  var f = files.pop();
  processFile(f, function(err) {
    // Handle error if any
    doFile(files, oncomplete); // Recurse
  });
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};
第二种方式,让我们调用它并行类似于summin,如:
var files = [];
doFiles(files, oncomplete);

function doFiles(files, oncomplete) {
  var exp = files.length;
  var done = 0;
  for (var i = 0; i < exp; i++) {
    processFile(files[i], function(err) {
      // Handle errors (but still need to increment counter)
      if (++done === exp) return oncomplete();      
    });
  }
};

function processFile(file, callback) {
  // Do whatever you want to do and once 
  // done call the callback
  ...
  callback();
};
现在看起来很明显你应该使用第二种方法但是你会发现,对于IO密集型操作,在并行化时你并没有真正获得任何性能提升。第一种方法的一个缺点是递归会破坏堆栈跟踪。 TNX 圭多     

要回复问题请先登录注册