0%

    为什么编写TaskSchedulerEx类?

    因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。

     特点:

    1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount

    2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker

    3、队列中尚未执行的任务可以取消

    4、通过扩展类TaskHelper实现任务分组

    5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程

    6、代码量相当精简,TaskSchedulerEx类只有260多行代码

    7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能

     对比SmartThreadPool:

    TaskSchedulerEx类代码(使用Semaphore实现):

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///


/// TaskScheduler扩展 /// 每个实例都是独立线程池 ///

public class TaskSchedulerEx : TaskScheduler, IDisposable
{ #region 外部方法 [DllImport(“kernel32.dll”, EntryPoint = “SetProcessWorkingSetSize”)] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion

    #region 变量属性事件
    private ConcurrentQueue<Task> \_tasks = new ConcurrentQueue<Task>(); private int \_coreThreadCount = 0; private int \_maxThreadCount = 0; private int \_auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
    private int \_activeThreadCount = 0; private System.Timers.Timer \_timer; private object \_lockCreateTimer = new object(); private bool \_run = true; private Semaphore \_sem = null; private int \_semMaxCount = int.MaxValue; //可以同时授予的信号量的最大请求数
    private int \_semCount = 0; //可用信号量请求数
    private int \_runCount = 0; //正在执行的和等待执行的任务数量

    /// <summary>
    /// 活跃线程数 /// </summary>
    public int ActiveThreadCount
    { get { return \_activeThreadCount; }
    } /// <summary>
    /// 核心线程数 /// </summary>
    public int CoreThreadCount
    { get { return \_coreThreadCount; }
    } /// <summary>
    /// 最大线程数 /// </summary>
    public int MaxThreadCount
    { get { return \_maxThreadCount; }
    } #endregion

    #region 构造函数
    /// <summary>
    /// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary>
    /// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
    /// <param name="maxThreadCount">最大线程数</param>
    public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20)
    {
        \_sem \= new Semaphore(0, \_semMaxCount);
        \_maxThreadCount \= maxThreadCount;
        CreateCoreThreads(coreThreadCount);
    } #endregion

    #region override GetScheduledTasks
    protected override IEnumerable<Task> GetScheduledTasks()
    { return \_tasks;
    } #endregion

    #region override TryExecuteTaskInline
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    { return false;
    } #endregion

    #region override QueueTask
    protected override void QueueTask(Task task)
    {
        \_tasks.Enqueue(task); while (\_semCount >= \_semMaxCount) //信号量已满,等待

{
Thread.Sleep(1);
}

        \_sem.Release();
        Interlocked.Increment(ref \_semCount);

        Interlocked.Increment(ref \_runCount); if (\_activeThreadCount < \_maxThreadCount && \_activeThreadCount < \_runCount)
        {
            CreateThread();
        }
    } #endregion

    #region 资源释放
    /// <summary>
    /// 资源释放 /// 队列中尚未执行的任务不再执行 /// </summary>
    public void Dispose()
    {
        \_run \= false; if (\_timer != null)
        {
            \_timer.Stop();
            \_timer.Dispose();
            \_timer \= null;
        } while (\_activeThreadCount > 0)
        {
            \_sem.Release();
            Interlocked.Increment(ref \_semCount);
        }
    } #endregion

    #region 创建核心线程池
    /// <summary>
    /// 创建核心线程池 /// </summary>
    private void CreateCoreThreads(int? coreThreadCount = null)
    { if (coreThreadCount != null) \_coreThreadCount = coreThreadCount.Value; for (int i = 0; i < \_coreThreadCount; i++)
        {
            Interlocked.Increment(ref \_activeThreadCount);
            Thread thread \= null;
            thread \= new Thread(new ThreadStart(() => {
                Task task; while (\_run)
                { if (\_tasks.TryDequeue(out task))
                    {
                        TryExecuteTask(task);
                        Interlocked.Decrement(ref \_runCount);
                    } else {
                        \_sem.WaitOne();
                        Interlocked.Decrement(ref \_semCount);
                    }
                }
                Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == 0)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
                    }
                }
            }));
            thread.IsBackground \= true;
            thread.Start();
        }
    } #endregion

    #region 创建辅助线程
    /// <summary>
    /// 创建辅助线程 /// </summary>
    private void CreateThread()
    {
        Interlocked.Increment(ref \_activeThreadCount);
        Thread thread \= null;
        thread \= new Thread(new ThreadStart(() => {
            Task task;
            DateTime dt \= DateTime.Now; while (\_run && DateTime.Now.Subtract(dt).TotalMilliseconds < \_auxiliaryThreadTimeOut)
            { if (\_tasks.TryDequeue(out task))
                {
                    TryExecuteTask(task);
                    Interlocked.Decrement(ref \_runCount);
                    dt \= DateTime.Now;
                } else {
                    \_sem.WaitOne(\_auxiliaryThreadTimeOut);
                    Interlocked.Decrement(ref \_semCount);
                }
            }
            Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == \_coreThreadCount)
            {
                GC.Collect();
                GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                {
                    SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
                }
            }
        }));
        thread.IsBackground \= true;
        thread.Start();
    } #endregion

    #region 全部取消
    /// <summary>
    /// 全部取消 /// 取消队列中尚未执行的任务 /// </summary>
    public void CancelAll()
    {
        Task tempTask; while (\_tasks.TryDequeue(out tempTask))
        {
            Interlocked.Decrement(ref \_runCount);
        }
    } #endregion }

}

View Code

    TaskSchedulerEx类代码(使用AutoResetEvent实现):

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///


/// TaskScheduler扩展 /// 每个实例都是独立线程池 ///

public class TaskSchedulerEx : TaskScheduler, IDisposable
{ #region 外部方法 [DllImport(“kernel32.dll”, EntryPoint = “SetProcessWorkingSetSize”)] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion

    #region 变量属性事件
    private ConcurrentQueue<Task> \_tasks = new ConcurrentQueue<Task>(); private int \_coreThreadCount = 0; private int \_maxThreadCount = 0; private int \_auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
    private int \_activeThreadCount = 0; private System.Timers.Timer \_timer; private object \_lockCreateTimer = new object(); private bool \_run = true; private AutoResetEvent \_evt = new AutoResetEvent(false); /// <summary>
    /// 活跃线程数 /// </summary>
    public int ActiveThreadCount
    { get { return \_activeThreadCount; }
    } /// <summary>
    /// 核心线程数 /// </summary>
    public int CoreThreadCount
    { get { return \_coreThreadCount; }
    } /// <summary>
    /// 最大线程数 /// </summary>
    public int MaxThreadCount
    { get { return \_maxThreadCount; }
    } #endregion

    #region 构造函数
    /// <summary>
    /// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary>
    /// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
    /// <param name="maxThreadCount">最大线程数</param>
    public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20)
    {
        \_maxThreadCount \= maxThreadCount;
        CreateCoreThreads(coreThreadCount);
    } #endregion

    #region override GetScheduledTasks
    protected override IEnumerable<Task> GetScheduledTasks()
    { return \_tasks;
    } #endregion

    #region override TryExecuteTaskInline
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    { return false;
    } #endregion

    #region override QueueTask
    protected override void QueueTask(Task task)
    {
        CreateTimer();
        \_tasks.Enqueue(task);
        \_evt.Set();
    } #endregion

    #region 资源释放
    /// <summary>
    /// 资源释放 /// 队列中尚未执行的任务不再执行 /// </summary>
    public void Dispose()
    {
        \_run \= false; if (\_timer != null)
        {
            \_timer.Stop();
            \_timer.Dispose();
            \_timer \= null;
        } while (\_activeThreadCount > 0)
        {
            \_evt.Set();
        }
    } #endregion

    #region 创建核心线程池
    /// <summary>
    /// 创建核心线程池 /// </summary>
    private void CreateCoreThreads(int? coreThreadCount = null)
    { if (coreThreadCount != null) \_coreThreadCount = coreThreadCount.Value; for (int i = 0; i < \_coreThreadCount; i++)
        {
            Interlocked.Increment(ref \_activeThreadCount);
            Thread thread \= null;
            thread \= new Thread(new ThreadStart(() => {
                Task task; while (\_run)
                { if (\_tasks.TryDequeue(out task))
                    {
                        TryExecuteTask(task);
                    } else {
                        \_evt.WaitOne();
                    }
                }
                Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == 0)
                {
                    GC.Collect();
                    GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                    {
                        SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
                    }
                }
            }));
            thread.IsBackground \= true;
            thread.Start();
        }
    } #endregion

    #region 创建辅助线程
    /// <summary>
    /// 创建辅助线程 /// </summary>
    private void CreateThread()
    {
        Interlocked.Increment(ref \_activeThreadCount);
        Thread thread \= null;
        thread \= new Thread(new ThreadStart(() => {
            Task task;
            DateTime dt \= DateTime.Now; while (\_run && DateTime.Now.Subtract(dt).TotalMilliseconds < \_auxiliaryThreadTimeOut)
            { if (\_tasks.TryDequeue(out task))
                {
                    TryExecuteTask(task);
                    dt \= DateTime.Now;
                } else {
                    \_evt.WaitOne(\_auxiliaryThreadTimeOut);
                }
            }
            Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == \_coreThreadCount)
            {
                GC.Collect();
                GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
                {
                    SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
                }
            }
        }));
        thread.IsBackground \= true;
        thread.Start();
    } #endregion

    #region 创建定时器
    private void CreateTimer()
    { if (\_timer == null) //\_timer不为空时,跳过,不走lock,提升性能

{ if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) //活跃线程数达到最大线程数时,跳过,不走lock,提升性能
{ lock (_lockCreateTimer)
{ if (_timer == null)
{
_timer = new System.Timers.Timer();
_timer.Interval = _coreThreadCount == 0 ? 1 : 500;
_timer.Elapsed += (s, e) => { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)
{ if (_tasks.Count > 0)
{ if (_timer.Interval != 20) _timer.Interval = 20;
CreateThread();
} else { if (_timer.Interval != 500) _timer.Interval = 500;
}
} else { if (_timer != null)
{
_timer.Stop();
_timer.Dispose();
_timer = null;
}
}
};
_timer.Start();
}
}
}
}
} #endregion

    #region 全部取消
    /// <summary>
    /// 全部取消 /// 取消队列中尚未执行的任务 /// </summary>
    public void CancelAll()
    {
        Task tempTask; while (\_tasks.TryDequeue(out tempTask)) { }
    } #endregion }

}

View Code

    RunHelper类代码:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///


/// 线程工具类 ///

public static class RunHelper
{ #region 变量属性事件

    #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static Task Run(this TaskScheduler scheduler, Action<object\> doWork, object arg = null, Action<Exception> errorAction = null)
    { return Task.Factory.StartNew((obj) => { try {
                doWork(obj);
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误");
            }
        }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null)
    { return Task.Factory.StartNew(() => { try {
                doWork();
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误");
            }
        }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
    { return Task.Factory.StartNew<T>((obj) => { try { return doWork(obj);
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
            }
        }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
    { return Task.Factory.StartNew<T>(() => { try { return doWork();
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
            }
        }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
    { return await Task.Factory.StartNew<T>((obj) => { try { return doWork(obj);
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
            }
        }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion

    #region 线程中执行
    /// <summary>
    /// 线程中执行 /// </summary>
    public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
    { return await Task.Factory.StartNew<T>(() => { try { return doWork();
            } catch (Exception ex)
            { if (errorAction != null) errorAction(ex);
                LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
            }
        }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    } #endregion }

}

View Code

    TaskHelper扩展类:

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils
{ ///


/// Task帮助类基类 ///

public class TaskHelper
{ #region 变量
///
/// 处理器数 ///

private static int _processorCount = Environment.ProcessorCount; #endregion

    #region UI任务
    private static TaskScheduler \_UITask; /// <summary>
    /// UI任务(2-4个线程) /// </summary>
    public static TaskScheduler UITask
    { get { if (\_UITask == null) \_UITask = new TaskSchedulerEx(2, 4); return \_UITask;
        }
    } #endregion

    #region 菜单任务
    private static TaskScheduler \_MenuTask; /// <summary>
    /// 菜单任务(2-4个线程) /// </summary>
    public static TaskScheduler MenuTask
    { get { if (\_MenuTask == null) \_MenuTask = new TaskSchedulerEx(2, 4); return \_MenuTask;
        }
    } #endregion

    #region 计算任务
    private static TaskScheduler \_CalcTask; /// <summary>
    /// 计算任务(线程数:处理器数\*2) /// </summary>
    public static TaskScheduler CalcTask
    { get { if (\_CalcTask == null) \_CalcTask = new LimitedTaskScheduler(\_processorCount \* 2); return \_CalcTask;
        }
    } #endregion

    #region 网络请求
    private static TaskScheduler \_RequestTask; /// <summary>
    /// 网络请求(8-32个线程) /// </summary>
    public static TaskScheduler RequestTask
    { get { if (\_RequestTask == null) \_RequestTask = new TaskSchedulerEx(8, 32); return \_RequestTask;
        }
    } #endregion

    #region 数据库任务
    private static TaskScheduler \_DBTask; /// <summary>
    /// 数据库任务(8-32个线程) /// </summary>
    public static TaskScheduler DBTask
    { get { if (\_DBTask == null) \_DBTask = new TaskSchedulerEx(8, 32); return \_DBTask;
        }
    } #endregion

    #region IO任务
    private static TaskScheduler \_IOTask; /// <summary>
    /// IO任务(8-32个线程) /// </summary>
    public static TaskScheduler IOTask
    { get { if (\_IOTask == null) \_IOTask = new TaskSchedulerEx(8, 32); return \_IOTask;
        }
    } #endregion

    #region 首页任务
    private static TaskScheduler \_MainPageTask; /// <summary>
    /// 首页任务(8-32个线程) /// </summary>
    public static TaskScheduler MainPageTask
    { get { if (\_MainPageTask == null) \_MainPageTask = new TaskSchedulerEx(8, 32); return \_MainPageTask;
        }
    } #endregion

    #region 图片加载任务
    private static TaskScheduler \_LoadImageTask; /// <summary>
    /// 图片加载任务(8-32个线程) /// </summary>
    public static TaskScheduler LoadImageTask
    { get { if (\_LoadImageTask == null) \_LoadImageTask = new TaskSchedulerEx(8, 32); return \_LoadImageTask;
        }
    } #endregion

    #region 浏览器任务
    private static TaskScheduler \_BrowserTask; /// <summary>
    /// 浏览器任务(2-4个线程) /// </summary>
    public static TaskScheduler BrowserTask
    { get { if (\_BrowserTask == null) \_BrowserTask = new TaskSchedulerEx(2, 4); return \_BrowserTask;
        }
    } #endregion }

}

View Code

    Form1.cs测试代码:

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Management; using System.Reflection; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using Utils; namespace test
{ public partial class Form1 : Form
{ private TaskSchedulerEx _taskSchedulerEx = null; private TaskSchedulerEx _taskSchedulerExSmall = null; private TaskSchedulerEx _task = null; public Form1()
{
InitializeComponent();
_taskSchedulerEx = new TaskSchedulerEx(50, 500);
_taskSchedulerExSmall = new TaskSchedulerEx(5, 50);
_task = new TaskSchedulerEx(2, 10);
} private void Form1_Load(object sender, EventArgs e)
{

    } /// <summary>
    /// 模拟大量网络请求任务 /// </summary>
    private void button1\_Click(object sender, EventArgs e)
    {
        DoTask(\_taskSchedulerEx, 200000, 1000, 20);
    } /// <summary>
    /// 模拟CPU密集型任务 /// </summary>
    private void button2\_Click(object sender, EventArgs e)
    {
        DoTask(\_taskSchedulerEx, 100000, 2000, 1);
    } /// <summary>
    /// 模拟大量网络请求任务 /// </summary>
    private void button3\_Click(object sender, EventArgs e)
    {
        DoTask(\_taskSchedulerExSmall, 2000, 100, 20);
    } /// <summary>
    /// 模拟CPU密集型任务 /// </summary>
    private void button4\_Click(object sender, EventArgs e)
    {
        DoTask(\_taskSchedulerExSmall, 2000, 100, 1);
    } /// <summary>
    /// 模拟任务 /// </summary>
    /// <param name="scheduler">scheduler</param>
    /// <param name="taskCount">任务数量</param>
    /// <param name="logCount">每隔多少条数据打一个日志</param>
    /// <param name="delay">模拟延迟或耗时(毫秒)</param>
    private void DoTask(TaskSchedulerEx scheduler, int taskCount, int logCount, int delay)
    {
        \_task.Run(() \=> {
            Log("开始");
            DateTime dt \= DateTime.Now;
            List<Task> taskList = new List<Task>(); for (int i = 1; i <= taskCount; i++)
            {
                Task task \= scheduler.Run((obj) => { var k = (int)obj;
                    Thread.Sleep(delay); //模拟延迟或耗时
                    if (k % logCount == 0)
                    {
                        Log("最大线程数:" + scheduler.MaxThreadCount + " 核心线程数:" + scheduler.CoreThreadCount + " 活跃线程数:" + scheduler.ActiveThreadCount.ToString().PadLeft(4, ' ') + " 处理数/总数:" + k + " / " + taskCount);
                    }
                }, i, (ex) \=> {
                    Log(ex.Message);
                });
                taskList.Add(task);
            }
            Task.WaitAll(taskList.ToArray()); double d = DateTime.Now.Subtract(dt).TotalSeconds;
            Log("完成,耗时:" + d + "秒");
        });
    } private void Form1\_FormClosed(object sender, FormClosedEventArgs e)
    { if (\_taskSchedulerEx != null)
        {
            \_taskSchedulerEx.Dispose(); //释放资源
            \_taskSchedulerEx = null;
        }
    }
}

}

View Code

     测试截图: