Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >一个简单的Parallel.ForEach实现

一个简单的Parallel.ForEach实现

作者头像
全栈程序员站长
发布于 2022-09-09 03:54:35
发布于 2022-09-09 03:54:35
4840
举报

大家好,又见面了,我是你们的朋友全栈君。

在.net的Task Parallel Library中有一个很方便的功能Parallel.ForEach,可以实现多任务的并发执行,另外还带着栅栏功能,非常好用。但是这一功能必须需要clr4.0支持(CTP版的不大好用),对于低版本的.net要实现类似功能只有自己写一个了。

codeproject上面文章Poor Man’s Parallel.ForEach Iterator中就有一种简单而有效的实现。但作者附录的代码有如下几个问题:

  1. 无法对每个并发任务分别制定不同的线程数
  2. 算法本身有点问题,任务执行完会报错
  3. 不能快速响应异常

针对以上几点,我对那段代码做了一点小改进,代码如下:

static class Parallel { public static void ParallelForEach<T>(this IEnumerable<T> enumerable, Action<T> action, int NumberOfParallelTasks) { var syncRoot = new object();

if (enumerable == null) return;

var enumerator = enumerable.GetEnumerator();

InvokeAsync<T> del = InvokeAction;

var seedItemArray = new T[NumberOfParallelTasks]; var resultList = new List<IAsyncResult>(NumberOfParallelTasks); var waitHanles = new List<WaitHandle>(NumberOfParallelTasks);

for (int i = 0; i < NumberOfParallelTasks; i++) {

lock (syncRoot) { if (!enumerator.MoveNext()) break; seedItemArray[i] = enumerator.Current; }

var iAsyncResult = del.BeginInvoke(enumerator, action, seedItemArray[i], syncRoot, i, null, null); resultList.Add(iAsyncResult); waitHanles.Add(iAsyncResult.AsyncWaitHandle); }

var taskCount = waitHanles.Count;

for (int i = 0; i < taskCount; i++) { var index = WaitHandle.WaitAny(waitHanles.ToArray()); del.EndInvoke(resultList[index]); resultList[index].AsyncWaitHandle.Close(); waitHanles.RemoveAt(index); resultList.RemoveAt(index); } }

delegate void InvokeAsync<T>(IEnumerator<T> enumerator, Action<T> achtion, T item, object syncRoot, int i);

static void InvokeAction<T>(IEnumerator<T> enumerator, Action<T> action, T item, object syncRoot, int i) { //if (String.IsNullOrEmpty(Thread.CurrentThread.Name)) // Thread.CurrentThread.Name = //String.Format(“Parallel.ForEach Worker Thread No:{0}”, i);

bool moveNext = true;

while (moveNext) { try { action.Invoke(item); } catch (Exception) { throw; }

lock (syncRoot) { moveNext = enumerator.MoveNext(); if (moveNext) item = enumerator.Current; } } } }

整个算法非常简洁,这里就不多介绍了。如果有错误欢迎指正。

转载于:https://www.cnblogs.com/TianFang/archive/2009/06/28/1512588.html

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/161829.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档