简介
这是我的建议对第三方物流的文章系列的第二部分。上次我介绍了任务,涵盖了这个理由:螺纹与任务创建任务触发方法/属性异常处理取消任务SynchronizationContext
这一次,我们将如何使用第三方物流的概念称为Continuations的。这是我们如何运行后的任务,从任务的返回值的东西的东西。第二十系列路线图
这是一个可能的6条第2,我希望大家会喜欢。下面显示的是什么,我想盖的大致的轮廓:{A}延续/取消链接的任务(本文)
我知道,一些民间只会阅读这篇文章和状态,这是目前在MSDN上提供的是什么,和我部分同意,但是,有几个原因,我选择上,仍然需要写这些文章,如下任务:只有真正将第一对夫妇的文章,这显示出类似的想法给MSDN之后,我觉得我会不会MSDN上的材料,将代表我的一些第三方物流研究的结果,我将概述在文章中(S),所以你将受益于我的研究,你可以读...赞成,美观大方。将实时输出的截图,这是MSDN上没有说的多,这可能帮助一些读者的文章(S)文本加强。可能有一些读者在这里,甚至从来没有听说过的任务并行库,因此不会通过它在MSDN,你知道老故事,你知道你正在寻找摆在首位的事情。我喜欢线程的文章,所以像他们这样做,所以我没有他们,会做,做了他们,并继续做。
说,如果人们真正认为这是太相似了MSDN(我还是希望它不会被),读这篇文章,让我知道这点,我会尽量调整即将发表的文章弥补。目录表
无论如何,我将在本文中是如下:一些更多的第三方物流的背景
本节是我真的应该有谈到在第一篇文章,但我没有,所以我包括在这里,而不是。我希望解释为什么第三方物流的设计师做事情他们做的方式,以及如何有利于我们所有。默认的任务调度
TPL依赖上的调度,组织和运行任务。 NET 4,默认(你可以交换)的任务调度是紧密集成的ThreadPool。因此,如果您使用默认的任务调度,运行任务的工作线程管理的ThreadPool,其中一般至少有许多工作线程有目标电脑上的核心。当有更多的任务比有工作者线程,某些任务必须排队,直到一个ThreadPool的工人线程成为免费服务的任务。
这是一个类似的概念,由现有的ThreadPool.QueueUserWorkItem部署一个(..).事实上,你能想到的默认的任务调度,作为改进的ThreadPool,工人根本任务。默认调度程序能够更好的性能比标准的ThreadPool单独的内核数量的增加,我们应审查。标准的ThreadPool
ThreadPool的基本上是全球首个在先进先出(FIFO)队列,其中工作项目分配做出列的工作。
{S0}
这是确定",直到内核数量的增加,然后它成为一个瓶颈,队列只能由一个工作线程访问一次。当有只有少数大粗粒度并行处理的商品,确保这一全球性的队列的单一访问的同步成本小,但是当你有许多细粒度并行去,你会与任务,同步成本与这种单一的全球队列,开始成为一个瓶颈。
任务总是可用的核心数量规模,我在某处读到。NET是能够有效地运行以百万计的任务。不同的方法来处理,必须从集中队列。调度下面我会谈谈这个更分散的方式。分散的本地队列
的。NET Framework提供了它自己的本地任务队列中的每个辅助线程的ThreadPool。本地队列分配负载和减轻的需要,使用单一的全球队列。你可以看到下面有许多本地队列有工作线程以及单一的全球队列,所有这些同时运行。
的想法是,一个工作线程可能会在最后一个先出(LIFO)的方法,它可能找到工作,或者它可能有回去(这样做招致较重的同步成本从本地队列),以单一的全局队列。
有一个绝招,TPL设计师设法得到发挥;如果一个工人线程本地队列是空的,他们可以回去,以取得更多的全球队列,但什么是TPL设计师没有得到它偷工作从邻国在本地队列FIFO顺序。
前MVP,现在微软雇员丹尼尔蛾,有一些非常直观的图的优秀后,这一切都说明他的博客上:{A17}。
这是很值得一读这个职位。
{A18}无论如何,我感到轻微的分歧很抱歉,我只是觉得我需要有。好了,现在的延续。的延续,那是什么
简而言之,延续允许任务链接在一起。虽然这并不喜欢,大不了多少,本身无害,是什么使延续的概念,真正闪耀,你可以有选择性的延续,也就是说,你可以有一个延续,只有火灾时,整组任务的完成,或的延续,只有火灾时完成许多任务,或者我们可以有一个的延续,只有火灾当一个任务失败或取消。延续买得起我们的自由程度。使用由TPL提供给我们的这种自由,就可以达到非常精细的粮食超过我们的许多并行代码方面的控制,而不是只有一个线程代码的单片块。
在这篇文章中,我特意设计的任务链相当小,但你真的可以让这些小或大,您认为合适的连锁。简单的延续演示代码项目:SimpleContinuation
{A19}我还没有真正得到过多地谈论这个小的代码片段/演示,除了说这是一个延续,而被称为真理,可能是我必须说,因为这真的是几乎所有有创造和使用的延续。死很容易,真的。// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 2000);
try
{
//setup a continuation for task
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
});
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
Console.ReadLine();
这里是一个小的截图,不是很令人兴奋,我知道,它虽然得到更好的。
{S2}WPF的同步演示代码项目:WPFDispatcherSynchonizationContext
这是我在以前的文章涵盖:{A20}的东西。没有什么,我已经改变了该代码,但我已经包括在这里再次。我们正在试图解决的基础上,你应该阅读该文章。本文包含的代码,告诉你,这是一个非常有效的原因,使用第三方物流的延续,是元帅的线程返回到UI控件的线程拥有。正如我所说,这个代码片断将没有多大意义,除非你去阅读第一篇文章中的有关章节。{C}
,这里是一个正在运行的演示的截图:
继续"WhenAnyquot;演示代码项目:ContinueWhenAny
一个要求并行编程工作时可能会尝试和运行在一个数据集的一组算法,并使用是表现最好的一个。这些算法可以从字面上从一个自定义的实验搜索。我没有时间,所以写一个自定义的搜索算法实验的全回转,而选择的东西更好地了解一点点:quot;排序Algorithmsquot。我从过去的C#比赛的冠军使用的例子:{A21},由Kanasz罗伯特。
这里的基本思路是,我想只能等到第一个算法(即,希望最快的国家之一)实现其目标。
我有squirreled离成小帮手附VS2010的解决方案ContinueWhen.Common DLL的算法,但尽量让许多运行延续的任务之一,只有等待的概念,这个代码应该很容易理解,而不需要看到实际的排序算法。static void Main(string[] args)
{
//create a list of random numbers to sort
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
//create 3 tasks to run 3 different sorting algorithms
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
//Bubble Sort Task
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
//Selection Sort Task
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
//Counting Sort Task
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
//Wait for any of them (assuming nothing goes wrong)
Task.Factory.ContinueWhenAny(
tasks,
(Task<SortingTaskResult> antecedent) =>
{
Console.WriteLine(antecedent.Result.ToString());
});
Console.ReadLine();
}
上面的代码显示了我们创建三个任务,每个排序算法之一,然后任何(首先要完成三项任务)的任务等待一个单一的延续。这里是生产什么时运行上面的清单是:
{S4}的
可以看出,",选择Sortquot;赢得了比赛,即使它是不是要启动的第一个任务,它赢得了,因为它是一个更好的算法比其他算法碰巧有一个可用的CPU使用在当时的核心。我只有在笔记本电脑上的两个CPU核心,我写了这个测试代码,这样的机会,如果我有4个CPU核心,第三算法可能已经结束,赢了,因为在纸面上,它是更好的算法。
其他有趣的事情,需要注意的是,因为我们是等待一组(阵列)来完成的任务只有一个,我们只能使用从单一的任务,我们等待的结果,如这个截图。继续"WhenAllquot;演示代码项目:ContinueWhenAll
{A22} ContinueWhenAll是一个有趣的,我能想到的次数,这将是非常有用的。你有分裂并行工作,但必须等待所有的部分完成,然后再移动到下一个步骤,或再次使用实验算法的思想。我们也可以想像,我们可能会看到如何执行我们的自定义算法的各种特性,所以必须等待他们完成,然后继续相当感兴趣。
我选择,因为它是一个简单的概念,其中的想法是,我们要运行不同的排序算法,对未排序的列表,并等待完成所有不同的算法,我们才可以再次使用的排序算法继续。
下面的代码:static void Main(string[] args)
{
//create a list of random numbers to sort
Random rand = new Random();
List<int> unsortedList = new List<int>();
int numberOfItemsToSort = 5000;
for (int i = 0; i < numberOfItemsToSort; i++)
{
unsortedList.Add(rand.Next(numberOfItemsToSort));
}
//create 3 tasks to run 3 different sorting algorithms
Task<SortingTaskResult>[] tasks =
new Task<SortingTaskResult>[3];
//Bubble Sort Task
tasks[0] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Bubble Sort");
}, unsortedList);
//Selection Sort Task
tasks[1] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Selection Sort");
}, unsortedList);
//Counting Sort Task
tasks[2] = Task.Factory.StartNew((state) =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
List<int> source = (List<int>)state;
List<int> localWorkList = new List<int>();
//copy
for (int i = 0; i < source.Count; i++)
{
localWorkList.Add(source[i]);
}
//run algorithm
List<int> result = SortingAlgorithms.CountingSort(localWorkList);
watch.Stop();
return new SortingTaskResult(
watch.ElapsedMilliseconds, result, "Counting Sort");
}, unsortedList);
//Wait for all of them (assuming nothing goes wrong)
Task.Factory.ContinueWhenAll(
tasks,
(antecedents) =>
{
foreach (Task<SortingTaskResult> task in antecedents)
{
Console.WriteLine(task.Result.ToString());
}
});
Console.ReadLine();
}
可以看出,这一次,延续,只有在所有的三种排序任务完成时,踢。下面是运行这个片段的结果:
{五}
其他有趣的事情要注意的是,因为我们是在一个组(阵列)来完成所有任务等待,我们能够从所有这些任务,我们等待这个截图所示,使用的结果。使用异常处理的延续演示代码项目:UsingContinuationForExceptionHandling
{A23}第一篇文章中,我也提到有另一种技术,我没有时间显示。好了,现在是时间显示,其他的方式。它真的是很简单:我们仅仅是使用延续。的想法是,我们有一个延续,如果先前任务完成跑,和另一个如果先前的任务是进入故障状态运行。
这是很容易实现使用TaskContinuationOptions当我们创建了一个任务延续,我们可以供应。下面是一些示例代码,说明我的意思:// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
if (i > 100)
{
InvalidOperationException ex =
new InvalidOperationException("oh no its > 100");
ex.Source = "taskWithFactoryAndState";
throw ex;
}
}
return ints;
}, 2000);
//and setup a continuation for it only on when faulted
taskWithFactoryAndState.ContinueWith((ant) =>
{
AggregateException aggEx = ant.Exception;
Console.WriteLine("OOOOPS : The Task exited with Exception(s)");
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}, TaskContinuationOptions.OnlyOnFaulted);
//and setup a continuation for it only on ran to completion
taskWithFactoryAndState.ContinueWith((ant) =>
{
List<int> result = ant.Result;
foreach (int resultValue in result)
{
Console.WriteLine("Task produced {0}", resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
告诉你会发生什么,当我们运行这个地段,这里是一个演示:
{中六}
参见如何只延续跑了,这是一个应该运行quot; OnlyOnFaultedquot。将继续作为一个管道演示代码项目:UsingContinuationsAsPipelines
{A24}我没有在本文开头提到的地方,你可以使用continuation来链任务一起,使它们作为简单或复杂,因为你选择。我还没有坚果或任何东西,但我有如下所示的一个小例子,这是稍稍比迄今所看到的的例子。它说明虽然是事实,你可以很容易地继续延续。static void Main(string[] args)
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
ints.Add(i);
}
return ints;
}, 10);
//and setup a continuation for it only on ran
//to completion, where this continuation
//returns a result too, which will be used by yet another continuation
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, " +
"which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion)
//Another continution
.ContinueWith((ant) =>
{
List<int> parentResult = ant.Result;
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Continuation Task produced Square of {0}",
resultValue);
}
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Console.ReadLine();
}
这当然不是火箭科学之一。我做的是创建一个初始任务,创建并返回一个数字的列表。这Listlt; intgt产生的第一项任务,然后通过一个延续原来的结果是从原来的任务(先前)打印,并取得原始任务的平方创建一个新的Listlt; intgt;(生产的前因)值。这种延续的结果,然后递过来的又一延续,打印平方继续任务(这延续的前身)的结果。
每个延续假设一个理想的世界,并且将只运行如果继续原来的任务运行完成。
下面是这一个运行中的一个小演示:
{七}延续先前捕获异常演示代码项目:CatchExceptionInAntecedent
{A25}所以我们现在已经看到使用任务/延续的几个例子,我们已经看到,我们可以使用continuation来运行,当事情计划,我们也可以运行任务时,原来的任务未能完成其工作,但有时,我们可能只是想有一个未指定的延续,总是会发生,无论是原任务完成成功与否,并决定继续做什么,如果有什么不正确的原始任务的状态。
在这里,我们将如何检查在延续,我们重新抛出原始的异常,由最初的任务是提供的原始任务异常的一个例子。由于在这个例子中继续重新抛出一个异常,我们需要确保它会抛出的异常会以某种方式(我谈到异常观察最后一次,当我谈到触发方法/属性,如等待()/结果),这样我等待()上的延续。
下面是代码:try
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
ints.Add(i);
if (i == 5)
throw new InvalidOperationException(
"Don't like 5 its vulgar and dirty");
}
return ints;
}, 100);
//Setup a continuation which will not run
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
if (ant.Status == TaskStatus.Faulted)
throw ant.Exception.InnerException;
Console.WriteLine("In Continuation, no problems in Antecedent");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, " +
"which will be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
});
//wait for the task to complete
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
Console.WriteLine("Finished");
这里是一个小本运行演示截图。看我们如何陷入原始异常的原始任务,并rethrew成功(保留的异常信息)的try / catch捕获异常了。
取消延续演示代码项目:CancellingContinuations
其中一个最明显的事情,你可能要与延续是取消它,权吗?幸运的是,你已经看到所有的在{A26}贸易技巧,记得我们在最后一次看着CancellationTokenSource对象?
没有它远不止于此。我们创建一个新的CancellationTokenSource的,并通过CancellationToken从任何第三方物流的任务/延续,我们要受到影响时CancellationToken被取消。
仍然适用相同的规则,你} {A26看到,我们必须良好,并确保任务/延续的期望,并利用一个CancellationToken抛出一个异常,要求取消时(记住,这是至关重要的确保任务过渡到正确的状态)。
无论如何,我可能说话太多,当代码不言自明。代码本身很简单。我们原来的任务,创建一个Listlt; intgt,然后用里面的延续,在原来的任务编号是平方/印刷和返回。然而,5秒后,原来的任务是创造,传递给原来的任务,并继续的CancellationToken被取消。CancellationTokenSource tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
try
{
// create the task
Task<List<int>> taskWithFactoryAndState =
Task.Factory.StartNew<List<int>>((stateObj) =>
{
Console.WriteLine("In TaskWithFactoryAndState");
List<int> ints = new List<int>();
for (int i = 0; i < (int)stateObj; i++)
{
tokenSource.Token.ThrowIfCancellationRequested();
ints.Add(i);
Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
Thread.Sleep(1000); // simulate some work
}
return ints;
}, 10000, tokenSource.Token);
Thread.Sleep(5000); //wait 5 seconds then cancel the runnning Task
tokenSource.Cancel();
//Setup a continuation which will not run
taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
{
Console.WriteLine("In Continuation");
List<int> parentResult = ant.Result;
List<int> result = new List<int>();
foreach (int resultValue in parentResult)
{
Console.WriteLine("Parent Task produced {0}, which will " +
"be squared by continuation",
resultValue);
result.Add(resultValue * resultValue);
}
return result;
}, tokenSource.Token);
taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
}
}
finally
{
tokenSource.Dispose();
}
Console.WriteLine("Finished");
Console.ReadLine();
下面的结果:
{S9}
注意如何继续甚至没有踢在所有,我们不仅创造了5项。这是由于CancellationToken取消踢英寸就是这么简单,取消延续,你要爱这个TPL东西的人。这就是现在
我知道这篇文章没有尽可能多说的第一件事是延续是非常容易获得与交手的,所以有说。接下来的两篇文章很可能是大小相同的排序,因为这,但那些后,将有更多的肉。
这是我想在这篇文章中说。我希望你喜欢它,并希望有更多。如果你不喜欢这篇文章,想了解更多,你能腾出一些时间来发表评论和表决?非常感谢。
,希望看到下一个你,一前一后,后一个,是六个总。我更忙碌起来。