如何为foreach创建并行预取

| 给出了在C#,TPL,并行扩展,异步CTP,响应式扩展中执行异步操作的众多新方法。我想知道并行化以下内容的提取和处理的最简单方法是:
foreach(string url in urls)
{
   var file = FetchFile(url);
   ProcessFile(file);
}
附带条件是,虽然可以随时提取文件,但是“ 1”一次只能处理一个文件,应该依次调用。 简而言之,使
FetchFile
ProcessFile
以流水线方式表现(即同时发生)的最简单方法是什么?     
已邀请:
        这是RX方式。此扩展会将uri的蒸汽转换为流的流:
    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
    TimeSpan timeout)
    {
        return
            from wc in source.Select(WebRequest.Create)
            from s in Observable
                .FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
                    wc.EndGetResponse)()
                .Timeout(timeout, Observable.Empty<WebResponse>())
                .Catch(Observable.Empty<WebResponse>())
            select s.GetResponseStream();
    }
用法:
new [] { \"myuri.net\\file1.dat\", \"myuri.net\\file2.dat\" }
   .ToObservable()
   .RequestToStream(TimeSpan.FromSeconds(5))
   .Do(stream = > ProcessStream(stream))
   .Subscribe();
编辑:哎呀,还没有注意到文件写入序列化的要求。这部分可以通过使用.Concat来完成,它本质上是一个RX队列(另一个是.Zip)。 让我们有一个.StreamToFile扩展名:
    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
    {
        return Observable.Defer(() =>
            source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
    }
现在您可以并行处理Web请求,但可以序列化来自它们的文件:
        new[] { \"myuri.net\\file1.dat\", \"myuri.net\\file2.dat\" }
            .ToObservable()
            .RequestToStream(TimeSpan.FromSeconds(5))
            .Select((stream, i) => Tuple.Create(stream, i.ToString() + \".dat\"))
            .Select(x => x.StreamToFile())
            .Concat()
            .Subscribe();
    
        给定“ 1”的约束,我想说您应该使用TPL异步获取数据,然后排队引用预加载数据的令牌。然后,您可以拥有一个后台线程,该线程将项目从队列中拉出并将它们逐个交给ProcessFile。这是生产者/消费者模式。 对于队列,您可以看一下BlockingCollection,它可以提供一个线程安全的队列,并且还具有能够减轻工作量的良好效果。     
        由于我不了解所有花哨的机制,因此我可能会以旧的方式进行操作,尽管我怀疑它会被归类为“简单”:
var q = new Queue<MyFile>();
var ev = new ManualResetEvent(false);

new System.Threading.Thread(() =>
{
    while ( true )
    {
        ev.WaitOne();
        MyFile item;
        lock (q)
        {
            item = q.Dequeue();
            if ( q.Count == 0 )
                ev.Reset();
        }
        if ( item == null )
            break;
        ProcessFile(item);
    }
}).Start();
foreach(string url in urls)
{
    var file = FetchFile(url);
    lock (q)
    {
        q.Enqueue(file);
        ev.Set();
    }
}
lock (q)
{
    q.Enqueue(null);
    ev.Set();
}
    
        异步实际上并不表示并行。这仅表示您不会阻止等待其他操作。但是您可以利用异步I / O来在下载URL时不阻塞线程,即,如果您这样做,则不需要与url一样多的线程来并行下载它们:
var client = new WebClient();
var syncLock = new object();
TaskEx.WhenAll(urls.Select(url => {
  client.DownloadDataTaskAsync(url).ContinueWith((t) => {
    lock(syncLock) {
      ProcessFile(t.Result);
    }
  });
}));
基本上,我们为每个URL创建一个异步下载任务,然后在完成任何任务后,调用一个继续操作,该继续操作使用一个普通对象作为同步锁来确保
ProcessFile
顺序发生。直到最后一个
ProcessFile
延续完成,12ѭ才会返回。 您可以使用RX的ѭ14the避免显式锁定(但当然它将在内部锁定):
var pipeline = new ReplaySubject<byte[]>();
var files = pipeline.ToEnumerable();
var client = new WebClient();
TaskEx.WhenAll(urls
        .Select(download => client.DownloadDataTaskAsync((string) download)
            .ContinueWith(t => pipeline.OnNext(t.Result))
        )
    ).ContinueWith(task => pipeline.OnCompleted(task));
foreach(var file in files) {
    ProcessFile(file);
}
在这里,我们使用ѭ14作为文件下载渠道。每次下载均异步完成,并将结果发布到the17ѭ阻止的管道中(即顺序发生)。当所有任务完成时,我们完成可观察的事物,它退出了
foreach
。     

要回复问题请先登录注册