当前位置:去回答>百科问答>何时使用 Parallel.ForEach,何时使用 PLINQ

何时使用 Parallel.ForEach,何时使用 PLINQ

2024-08-17 13:13:16 编辑:join 浏览量:541

何时使用 Parallel.ForEach,何时使用 PLINQ

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。.netframework4中提供了Parallel.ForEach和PLINQ来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。Parallel.ForEachParallel.ForEach是foreach的多线程实现,他们都能对IEnumerable类型对象进行遍历,Parallel.ForEach的特殊之处在于它使用多线程来执行循环体内的代码段。Parallel.ForEach最常用的形式如下:publicstaticParallelLoopResultForEach(IEnumerablesource,Actionbody)PLINQPLINQ也是一种对数据进行并行处理的编程模型,它通过LINQ的语法来实现类似Parallel.ForEach的多线程并行处理。场景一:简单数据之独立操作的并行处理(使用Parallel.ForEach)示例代码:publicstaticvoidIndependentAction(IEnumerablesource,Actionaction){Parallel.ForEach(source,element=>action(element));}理由:1.虽然PLINQ也提供了一个类似的ForAll接口,但它对于简单的独立操作太重量化了。2.使用Parallel.ForEach你还能够设定ParallelOptions.MaxDegreeOfParalelism参数(指定最多需要多少个线程),这样当ThreadPool资源匮乏(甚至当可用线程数Movie){varProcessedMovie=Movie.AsParallel().AsOrdered().Select(frame=>ConvertToGrayscale(frame));foreach(vargrayscaleFrameinProcessedMovie){//Movieframeswillbeevaluatedlazily}}理由:1.Parallel.ForEach实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:publicstaticParallelLoopResultForEach(IEnumerablesource,Actionbody)这个重载的Action多包含了index参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子:publicstaticdouble[]PairwiseMultiply(double[]v1,double[]v2){varlength=Math.Min(v1.Length,v2.Lenth);double[]result=newdouble[length];Parallel.ForEach(v1,(element,loopstate,elementIndex)=>result[elementIndex]=element*v2[elementIndex]);returnresult;}你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是IEnumerable那么你有4个解决方案:(1)调用IEnumerable.Count()来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。(2)Thesecondoptionwouldbetomaterializetheoriginalcollectionbeforeusingit;intheeventthatyourinputdatasetisprohibitivelylarge,neitherofthefirsttwooptionswillbefeasible.(没看懂贴原文)(3)第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。(4)自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)2.相比之下PLINQ的AsOrdered方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazymaterialized)场景三:流数据之并行处理(使用PLINQ)PLINQ能输出流数据,这个特性在一下场合非常有用:1.结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息2.你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)示例:publicstaticvoidAnalyzeStocks(IEnumerableStocks){varStockRiskPortfolio=Stocks.AsParallel().AsOrdered().Select(stock=>new{Stock=stock,Risk=ComputeRisk(stock)}).Where(stockRisk=>ExpensiveRiskAnalysis(stockRisk.Risk));foreach(varstockRiskinStockRiskPortfolio){SomeStockComputation(stockRisk.Risk);//StockRiskPortfoliowillbeastreamofresults}}这里使用一个单线程的foreach来对PLINQ的输出进行后续处理,通常情况下foreach不需要等待PLINQ处理完所有数据就能开始运作。PLINQ也允许指定输出缓存的方式,具体可参照PLINQ的WithMergeOptions方法,及ParallelMergeOptions枚举场景四:处理两个集合(使用PLINQ)PLINQ的Zip方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。示例:publicstaticIEnumerableZipping(IEnumerablea,IEnumerableb){returna.AsParallel().AsOrdered().Select(element=>ExpensiveComputation(element)).Zip(b.AsParallel().AsOrdered().Select(element=>DifferentExpensiveComputation(element)),(a_element,b_element)=>Combine(a_element,b_element));}示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给Zip进行后续处理(Combine)。Parallel.ForEach也能实现类似的Zip处理:publicstaticIEnumerableZipping(IEnumerablea,IEnumerableb){varnumElements=Math.Min(a.Count(),b.Count());varresult=newT[numElements];Parallel.ForEach(a,(element,loopstate,index)=>{vara_element=ExpensiveComputation(element);varb_element=DifferentExpensiveComputation(b.ElementAt(index));result[index]=Combine(a_element,b_element);});returnresult;}当然使用Parallel.ForEach后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。场景五:线程局部变量Parallel.ForEach提供了一个线程局部变量的重载,定义如下:publicstaticParallelLoopResultForEach(IEnumerablesource,FunclocalInit,Funcbody,ActionlocalFinally)使用的示例:publicstaticListFiltering(IEnumerablesource){varresults=newList();using(SemaphoreSlimsem=newSemaphoreSlim(1)){Parallel.ForEach(source,()=>newList(),(element,loopstate,localStorage)=>{boolfilter=filterFunction(element);if(filter)localStorage.Add(element);returnlocalStorage;},(finalStorage)=>{lock(myLock){results.AddRange(finalStorage)};});}returnresults;}线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序):publicstaticvoidUnsafeDownloadUrls(){WebClientwebclient=newWebClient();Parallel.ForEach(urls,(url,loopstate,index)=>{webclient.DownloadFile(url,filenames[index]+".dat");Console.WriteLine("{0}:{1}",Thread.CurrentThread.ManagedThreadId,url);});}通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException->WebClientdoesnotsupportconcurrentI/Ooperations.”。这是因为多个线程无法同时访问同一个WebClient对象。所以我们会把WebClient对象定义到线程中来:publicstaticvoidBAD_DownloadUrls(){Parallel.ForEach(urls,(url,loopstate,index)=>{WebClientwebclient=newWebClient();webclient.DownloadFile(url,filenames[index]+".dat");Console.WriteLine("{0}:{1}",Thread.CurrentThread.ManagedThreadId,url);});}修改之后依然有问题,因为你的机器不是服务器,大量实例化的WebClient迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题:publicstaticvoiddownloadUrlsSafe(){Parallel.ForEach(urls,()=>newWebClient(),(url,loopstate,index,webclient)=>{webclient.DownloadFile(url,filenames[index]+".dat");Console.WriteLine("{0}:{1}",Thread.CurrentThread.ManagedThreadId,url);returnwebclient;},(webclient)=>{});}这样的写法保证了我们能获得足够的WebClient实例,同时这些WebClient实例彼此隔离仅仅属于各自关联的线程。虽然PLINQ提供了ThreadLocal对象来实现类似的功能:publicstaticvoiddownloadUrl(){varwebclient=newThreadLocal(()=>newWebClient());varres=urls.AsParallel().ForAll(url=>{webclient.Value.DownloadFile(url,host[url]+".dat"));Console.WriteLine("{0}:{1}",Thread.CurrentThread.ManagedThreadId,url);});}但是请注意:ThreadLocal相对而言开销更大!场景五:退出操作(使用Parallel.ForEach)Parallel.ForEach有个重载声明如下,其中包含一个ParallelLoopState对象:publicstaticParallelLoopResultForEach(IEnumerablesource,Actionbody)ParallelLoopState.Stop()提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。ParallelLoopState.IsStopped属性可用来判定其他迭代是否调用了Stop方法。示例:publicstaticbooleanFindAny(IEnumerableTSpace,Tmatch)whereT:IEqualityComparer{varmatchFound=false;Parallel.ForEach(TSpace,(curValue,loopstate)=>{if(curValue.Equals(match)){matchFound=true;loopstate.Stop();}});returnmatchFound;}ParallelLoopState.Break()通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用Break的起作用,并被记录到ParallelLoopState.LowestBreakIteration属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小index,那么可以使用以下的代码:publicstaticintFindLowestIndex(IEnumerableTSpace,Tmatch)whereT:IEqualityComparer{varloopResult=Parallel.ForEach(source,(curValue,loopState,curIndex)=>{if(curValue.Equals(match)){loopState.Break();}});varmatchedIndex=loopResult.LowestBreakIteration;returnmatchedIndex.HasValue?matchedIndex:-1;}

标签:何时,Parallel,ForEach

版权声明:文章由 去回答 整理收集,来源于互联网或者用户投稿,如有侵权,请联系我们,我们会立即处理。如转载请保留本文链接:https://www.quhuida.com/answer/217466.html
热门文章