并行生产者/使用者是否具有容错能力?

| 我需要使用SqlBulkCopy将大型csv文件分块为几个不同的数据库插入。我打算通过2个单独的任务来执行此操作,其中1个用于批处理CSV文件,另一个用于插入数据库。例如,这就是我要处理的事情:
public class UberTask
{
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();

    public void PerformTask()
    {
        var notifier = new UINotifier();
        Task.Factory.StartNew(() =>
                                  {
                                      for (int i =0; i < 10; i++)
                                      {
                                          string description = string.Format(\"Scenario {0}\", i);

                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Reading \'{0}\' from file\", description)));

                                          // represents reading the CSV file.
                                          Thread.Sleep(500);
                                          notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Enqueuing \'{0}\'\", description)));
                                          _store.Add(new Tuple<string, int>(description, i));
                                      }
                                      _store.CompleteAdding();
                                  });

        var consumer = Task.Factory.StartNew(() =>
                                                 {
                                                     foreach (var item in _store.GetConsumingEnumerable())
                                                     {
                                                         var poppedItem = item;
                                                         notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format(\"Sending \'{0}\' to the database\", poppedItem.Item1)));
                                                         // represents sending stuff to the database.
                                                         Thread.Sleep(1000);
                                                     }
                                                 });
        consumer.Wait();
        Console.WriteLine(\"complete\");
    }
}
这是将两组相关任务配对的好方法吗?以上代码未处理的内容(需要处理): 如果代表CSV读取的任务出现故障,则其他任务需要停止(即使_store中仍有项目)。 如果代表数据库的任务插入错误,则其他进程可以停止处理。 如果配对任务中的任何一个出现故障,我将需要执行一些操作以回滚数据库更新(我不担心如何回滚),这更多地是关于如何编码“故障”的问题。发生在配对任务之一中,因此我需要进行一些整理。” 以上任何帮助将不胜感激!     
已邀请:
您可以使用异常处理和取消令牌来执行此操作。当管道阶段检测到错误时,它将捕获该错误并设置令牌。这将取消其他阶段。 finally块确保完成对CompleteAdding()的调用。这一点很重要,因为接收管道阶段可能会在等待收集时被阻塞,并且直到取消取消后才会处理取消。 您还希望排除集合中所有未处理的对象,或者在管道阶段完成时(最后)和/或在整个管道关闭时清理数据库连接。 这是执行此操作的管道阶段的示例:
    static void LoadPipelinedImages(IEnumerable<string> fileNames, 
                                    string sourceDir, 
                                    BlockingCollection<ImageInfo> original,
                                    CancellationTokenSource cts)
    {
        // ...
        var token = cts.Token;
        ImageInfo info = null;
        try
        {
            foreach (var fileName in fileNames)
            {
                if (token.IsCancellationRequested)
                    break;
                info = LoadImage(fileName, ...);
                original.Add(info, token);
                info = null;
            }                
        }
        catch (Exception e)
        {
            // in case of exception, signal shutdown to other pipeline tasks
            cts.Cancel();
            if (!(e is OperationCanceledException))
                throw;
        }
        finally
        {
            original.CompleteAdding();
            if (info != null) info.Dispose();
        }
    }
总体管道代码如下所示。它还支持通过设置取消令牌从外部(从UI)取消管道。
    static void RunPipelined(IEnumerable<string> fileNames, 
                             string sourceDir, 
                             int queueLength, 
                             Action<ImageInfo> displayFn,
                             CancellationTokenSource cts)
    {
        // Data pipes 
        var originalImages = new BlockingCollection<ImageInfo>(queueLength);
        var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength);
        var filteredImages = new BlockingCollection<ImageInfo>(queueLength);
        try
        {
            var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                    TaskContinuationOptions.None);
            // ...

            // Start pipelined tasks
            var loadTask = f.StartNew(() =>
                  LoadPipelinedImages(fileNames, sourceDir, 
                                      originalImages, cts));

            var scaleTask = f.StartNew(() =>
                  ScalePipelinedImages(originalImages, 
                                       thumbnailImages, cts));

            var filterTask = f.StartNew(() =>
                  FilterPipelinedImages(thumbnailImages, 
                                        filteredImages, cts));

            var displayTask = f.StartNew(() =>
                  DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
                       ... cts));

            Task.WaitAll(loadTask, scaleTask, filterTask, displayTask);
        }
        finally
        {
            // in case of exception or cancellation, there might be bitmaps
            // that need to be disposed.
            DisposeImagesInQueue(originalImages);
            DisposeImagesInQueue(thumbnailImages);
            DisposeImagesInQueue(filteredImages);                
        }
    }
有关完整示例,请参见此处下载的管道示例: http://parallelpatterns.codeplex.com/releases/view/50473 在这里讨论: http://msdn.microsoft.com/en-us/library/ff963548.aspx     

要回复问题请先登录注册