为什么编写TaskSchedulerEx类?
因为.NET默认线程池只有一个线程池,如果某个批量任务一直占着大量线程,甚至耗尽默认线程池,则会严重影响应用程序域中其它任务或批量任务的性能。
特点:
1、使用独立线程池,线程池中线程分为核心线程和辅助线程,辅助线程会动态增加和释放,且总线程数不大于参数_maxThreadCount
2、无缝兼容Task,使用上和Task一样,可以用它来实现异步,参见:C# async await 异步执行方法封装 替代 BackgroundWorker
3、队列中尚未执行的任务可以取消
4、通过扩展类TaskHelper实现任务分组
5、和SmartThreadPool对比,优点是无缝兼容Task类,和Task类使用没有区别,因为它本身就是对Task、TaskScheduler的扩展,所以Task类的ContinueWith、WaitAll等方法它都支持,以及兼容async、await异步编程
6、代码量相当精简,TaskSchedulerEx类只有260多行代码
7、池中的线程数量会根据负载自动增减,支持,但没有SmartThreadPool智能,为了性能,使用了比较笨的方式实现,不知道大家有没有既智能,性能又高的方案,我有一个思路,在定时器中计算每个任务执行平均耗时,然后使用公式(线程数 = CPU核心数 * ( 本地计算时间 + 等待时间 ) / 本地计算时间)来计算最佳线程数,然后按最佳线程数来动态创建线程,但这个计算过程可能会牺牲性能
对比SmartThreadPool:
TaskSchedulerEx类代码(使用Semaphore实现):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///
/// TaskScheduler扩展 /// 每个实例都是独立线程池 ///
public class TaskSchedulerEx : TaskScheduler, IDisposable
{ #region 外部方法 [DllImport(“kernel32.dll”, EntryPoint = “SetProcessWorkingSetSize”)] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion
#region 变量属性事件
private ConcurrentQueue<Task> \_tasks = new ConcurrentQueue<Task>(); private int \_coreThreadCount = 0; private int \_maxThreadCount = 0; private int \_auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
private int \_activeThreadCount = 0; private System.Timers.Timer \_timer; private object \_lockCreateTimer = new object(); private bool \_run = true; private Semaphore \_sem = null; private int \_semMaxCount = int.MaxValue; //可以同时授予的信号量的最大请求数
private int \_semCount = 0; //可用信号量请求数
private int \_runCount = 0; //正在执行的和等待执行的任务数量
/// <summary>
/// 活跃线程数 /// </summary>
public int ActiveThreadCount
{ get { return \_activeThreadCount; }
} /// <summary>
/// 核心线程数 /// </summary>
public int CoreThreadCount
{ get { return \_coreThreadCount; }
} /// <summary>
/// 最大线程数 /// </summary>
public int MaxThreadCount
{ get { return \_maxThreadCount; }
} #endregion
#region 构造函数
/// <summary>
/// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary>
/// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
/// <param name="maxThreadCount">最大线程数</param>
public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20)
{
\_sem \= new Semaphore(0, \_semMaxCount);
\_maxThreadCount \= maxThreadCount;
CreateCoreThreads(coreThreadCount);
} #endregion
#region override GetScheduledTasks
protected override IEnumerable<Task> GetScheduledTasks()
{ return \_tasks;
} #endregion
#region override TryExecuteTaskInline
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{ return false;
} #endregion
#region override QueueTask
protected override void QueueTask(Task task)
{
\_tasks.Enqueue(task); while (\_semCount >= \_semMaxCount) //信号量已满,等待
{
Thread.Sleep(1);
}
\_sem.Release();
Interlocked.Increment(ref \_semCount);
Interlocked.Increment(ref \_runCount); if (\_activeThreadCount < \_maxThreadCount && \_activeThreadCount < \_runCount)
{
CreateThread();
}
} #endregion
#region 资源释放
/// <summary>
/// 资源释放 /// 队列中尚未执行的任务不再执行 /// </summary>
public void Dispose()
{
\_run \= false; if (\_timer != null)
{
\_timer.Stop();
\_timer.Dispose();
\_timer \= null;
} while (\_activeThreadCount > 0)
{
\_sem.Release();
Interlocked.Increment(ref \_semCount);
}
} #endregion
#region 创建核心线程池
/// <summary>
/// 创建核心线程池 /// </summary>
private void CreateCoreThreads(int? coreThreadCount = null)
{ if (coreThreadCount != null) \_coreThreadCount = coreThreadCount.Value; for (int i = 0; i < \_coreThreadCount; i++)
{
Interlocked.Increment(ref \_activeThreadCount);
Thread thread \= null;
thread \= new Thread(new ThreadStart(() => {
Task task; while (\_run)
{ if (\_tasks.TryDequeue(out task))
{
TryExecuteTask(task);
Interlocked.Decrement(ref \_runCount);
} else {
\_sem.WaitOne();
Interlocked.Decrement(ref \_semCount);
}
}
Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == 0)
{
GC.Collect();
GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
}
}
}));
thread.IsBackground \= true;
thread.Start();
}
} #endregion
#region 创建辅助线程
/// <summary>
/// 创建辅助线程 /// </summary>
private void CreateThread()
{
Interlocked.Increment(ref \_activeThreadCount);
Thread thread \= null;
thread \= new Thread(new ThreadStart(() => {
Task task;
DateTime dt \= DateTime.Now; while (\_run && DateTime.Now.Subtract(dt).TotalMilliseconds < \_auxiliaryThreadTimeOut)
{ if (\_tasks.TryDequeue(out task))
{
TryExecuteTask(task);
Interlocked.Decrement(ref \_runCount);
dt \= DateTime.Now;
} else {
\_sem.WaitOne(\_auxiliaryThreadTimeOut);
Interlocked.Decrement(ref \_semCount);
}
}
Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == \_coreThreadCount)
{
GC.Collect();
GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
}
}
}));
thread.IsBackground \= true;
thread.Start();
} #endregion
#region 全部取消
/// <summary>
/// 全部取消 /// 取消队列中尚未执行的任务 /// </summary>
public void CancelAll()
{
Task tempTask; while (\_tasks.TryDequeue(out tempTask))
{
Interlocked.Decrement(ref \_runCount);
}
} #endregion }
}
View Code
TaskSchedulerEx类代码(使用AutoResetEvent实现):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///
/// TaskScheduler扩展 /// 每个实例都是独立线程池 ///
public class TaskSchedulerEx : TaskScheduler, IDisposable
{ #region 外部方法 [DllImport(“kernel32.dll”, EntryPoint = “SetProcessWorkingSetSize”)] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion
#region 变量属性事件
private ConcurrentQueue<Task> \_tasks = new ConcurrentQueue<Task>(); private int \_coreThreadCount = 0; private int \_maxThreadCount = 0; private int \_auxiliaryThreadTimeOut = 20000; //辅助线程释放时间
private int \_activeThreadCount = 0; private System.Timers.Timer \_timer; private object \_lockCreateTimer = new object(); private bool \_run = true; private AutoResetEvent \_evt = new AutoResetEvent(false); /// <summary>
/// 活跃线程数 /// </summary>
public int ActiveThreadCount
{ get { return \_activeThreadCount; }
} /// <summary>
/// 核心线程数 /// </summary>
public int CoreThreadCount
{ get { return \_coreThreadCount; }
} /// <summary>
/// 最大线程数 /// </summary>
public int MaxThreadCount
{ get { return \_maxThreadCount; }
} #endregion
#region 构造函数
/// <summary>
/// TaskScheduler扩展 /// 每个实例都是独立线程池 /// </summary>
/// <param name="coreThreadCount">核心线程数(大于或等于0,不宜过大)(如果是一次性使用,则设置为0比较合适)</param>
/// <param name="maxThreadCount">最大线程数</param>
public TaskSchedulerEx(int coreThreadCount = 10, int maxThreadCount = 20)
{
\_maxThreadCount \= maxThreadCount;
CreateCoreThreads(coreThreadCount);
} #endregion
#region override GetScheduledTasks
protected override IEnumerable<Task> GetScheduledTasks()
{ return \_tasks;
} #endregion
#region override TryExecuteTaskInline
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{ return false;
} #endregion
#region override QueueTask
protected override void QueueTask(Task task)
{
CreateTimer();
\_tasks.Enqueue(task);
\_evt.Set();
} #endregion
#region 资源释放
/// <summary>
/// 资源释放 /// 队列中尚未执行的任务不再执行 /// </summary>
public void Dispose()
{
\_run \= false; if (\_timer != null)
{
\_timer.Stop();
\_timer.Dispose();
\_timer \= null;
} while (\_activeThreadCount > 0)
{
\_evt.Set();
}
} #endregion
#region 创建核心线程池
/// <summary>
/// 创建核心线程池 /// </summary>
private void CreateCoreThreads(int? coreThreadCount = null)
{ if (coreThreadCount != null) \_coreThreadCount = coreThreadCount.Value; for (int i = 0; i < \_coreThreadCount; i++)
{
Interlocked.Increment(ref \_activeThreadCount);
Thread thread \= null;
thread \= new Thread(new ThreadStart(() => {
Task task; while (\_run)
{ if (\_tasks.TryDequeue(out task))
{
TryExecuteTask(task);
} else {
\_evt.WaitOne();
}
}
Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == 0)
{
GC.Collect();
GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
}
}
}));
thread.IsBackground \= true;
thread.Start();
}
} #endregion
#region 创建辅助线程
/// <summary>
/// 创建辅助线程 /// </summary>
private void CreateThread()
{
Interlocked.Increment(ref \_activeThreadCount);
Thread thread \= null;
thread \= new Thread(new ThreadStart(() => {
Task task;
DateTime dt \= DateTime.Now; while (\_run && DateTime.Now.Subtract(dt).TotalMilliseconds < \_auxiliaryThreadTimeOut)
{ if (\_tasks.TryDequeue(out task))
{
TryExecuteTask(task);
dt \= DateTime.Now;
} else {
\_evt.WaitOne(\_auxiliaryThreadTimeOut);
}
}
Interlocked.Decrement(ref \_activeThreadCount); if (\_activeThreadCount == \_coreThreadCount)
{
GC.Collect();
GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT)
{
SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, \-1, -1);
}
}
}));
thread.IsBackground \= true;
thread.Start();
} #endregion
#region 创建定时器
private void CreateTimer()
{ if (\_timer == null) //\_timer不为空时,跳过,不走lock,提升性能
{ if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount) //活跃线程数达到最大线程数时,跳过,不走lock,提升性能
{ lock (_lockCreateTimer)
{ if (_timer == null)
{
_timer = new System.Timers.Timer();
_timer.Interval = _coreThreadCount == 0 ? 1 : 500;
_timer.Elapsed += (s, e) => { if (_activeThreadCount >= _coreThreadCount && _activeThreadCount < _maxThreadCount)
{ if (_tasks.Count > 0)
{ if (_timer.Interval != 20) _timer.Interval = 20;
CreateThread();
} else { if (_timer.Interval != 500) _timer.Interval = 500;
}
} else { if (_timer != null)
{
_timer.Stop();
_timer.Dispose();
_timer = null;
}
}
};
_timer.Start();
}
}
}
}
} #endregion
#region 全部取消
/// <summary>
/// 全部取消 /// 取消队列中尚未执行的任务 /// </summary>
public void CancelAll()
{
Task tempTask; while (\_tasks.TryDequeue(out tempTask)) { }
} #endregion }
}
View Code
RunHelper类代码:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils
{ ///
/// 线程工具类 ///
public static class RunHelper
{ #region 变量属性事件
#endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static Task Run(this TaskScheduler scheduler, Action<object\> doWork, object arg = null, Action<Exception> errorAction = null)
{ return Task.Factory.StartNew((obj) => { try {
doWork(obj);
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误");
}
}, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null)
{ return Task.Factory.StartNew(() => { try {
doWork();
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误");
}
}, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
{ return Task.Factory.StartNew<T>((obj) => { try { return doWork(obj);
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
}
}, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
{ return Task.Factory.StartNew<T>(() => { try { return doWork();
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
}
}, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null)
{ return await Task.Factory.StartNew<T>((obj) => { try { return doWork(obj);
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
}
}, arg, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion
#region 线程中执行
/// <summary>
/// 线程中执行 /// </summary>
public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null)
{ return await Task.Factory.StartNew<T>(() => { try { return doWork();
} catch (Exception ex)
{ if (errorAction != null) errorAction(ex);
LogUtil.Error(ex, "ThreadUtil.Run错误"); return default(T);
}
}, CancellationToken.None, TaskCreationOptions.None, scheduler);
} #endregion }
}
View Code
TaskHelper扩展类:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils
{ ///
/// Task帮助类基类 ///
public class TaskHelper
{ #region 变量
///
/// 处理器数 ///
private static int _processorCount = Environment.ProcessorCount; #endregion
#region UI任务
private static TaskScheduler \_UITask; /// <summary>
/// UI任务(2-4个线程) /// </summary>
public static TaskScheduler UITask
{ get { if (\_UITask == null) \_UITask = new TaskSchedulerEx(2, 4); return \_UITask;
}
} #endregion
#region 菜单任务
private static TaskScheduler \_MenuTask; /// <summary>
/// 菜单任务(2-4个线程) /// </summary>
public static TaskScheduler MenuTask
{ get { if (\_MenuTask == null) \_MenuTask = new TaskSchedulerEx(2, 4); return \_MenuTask;
}
} #endregion
#region 计算任务
private static TaskScheduler \_CalcTask; /// <summary>
/// 计算任务(线程数:处理器数\*2) /// </summary>
public static TaskScheduler CalcTask
{ get { if (\_CalcTask == null) \_CalcTask = new LimitedTaskScheduler(\_processorCount \* 2); return \_CalcTask;
}
} #endregion
#region 网络请求
private static TaskScheduler \_RequestTask; /// <summary>
/// 网络请求(8-32个线程) /// </summary>
public static TaskScheduler RequestTask
{ get { if (\_RequestTask == null) \_RequestTask = new TaskSchedulerEx(8, 32); return \_RequestTask;
}
} #endregion
#region 数据库任务
private static TaskScheduler \_DBTask; /// <summary>
/// 数据库任务(8-32个线程) /// </summary>
public static TaskScheduler DBTask
{ get { if (\_DBTask == null) \_DBTask = new TaskSchedulerEx(8, 32); return \_DBTask;
}
} #endregion
#region IO任务
private static TaskScheduler \_IOTask; /// <summary>
/// IO任务(8-32个线程) /// </summary>
public static TaskScheduler IOTask
{ get { if (\_IOTask == null) \_IOTask = new TaskSchedulerEx(8, 32); return \_IOTask;
}
} #endregion
#region 首页任务
private static TaskScheduler \_MainPageTask; /// <summary>
/// 首页任务(8-32个线程) /// </summary>
public static TaskScheduler MainPageTask
{ get { if (\_MainPageTask == null) \_MainPageTask = new TaskSchedulerEx(8, 32); return \_MainPageTask;
}
} #endregion
#region 图片加载任务
private static TaskScheduler \_LoadImageTask; /// <summary>
/// 图片加载任务(8-32个线程) /// </summary>
public static TaskScheduler LoadImageTask
{ get { if (\_LoadImageTask == null) \_LoadImageTask = new TaskSchedulerEx(8, 32); return \_LoadImageTask;
}
} #endregion
#region 浏览器任务
private static TaskScheduler \_BrowserTask; /// <summary>
/// 浏览器任务(2-4个线程) /// </summary>
public static TaskScheduler BrowserTask
{ get { if (\_BrowserTask == null) \_BrowserTask = new TaskSchedulerEx(2, 4); return \_BrowserTask;
}
} #endregion }
}
View Code
Form1.cs测试代码:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Management; using System.Reflection; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using Utils; namespace test
{ public partial class Form1 : Form
{ private TaskSchedulerEx _taskSchedulerEx = null; private TaskSchedulerEx _taskSchedulerExSmall = null; private TaskSchedulerEx _task = null; public Form1()
{
InitializeComponent();
_taskSchedulerEx = new TaskSchedulerEx(50, 500);
_taskSchedulerExSmall = new TaskSchedulerEx(5, 50);
_task = new TaskSchedulerEx(2, 10);
} private void Form1_Load(object sender, EventArgs e)
{
} /// <summary>
/// 模拟大量网络请求任务 /// </summary>
private void button1\_Click(object sender, EventArgs e)
{
DoTask(\_taskSchedulerEx, 200000, 1000, 20);
} /// <summary>
/// 模拟CPU密集型任务 /// </summary>
private void button2\_Click(object sender, EventArgs e)
{
DoTask(\_taskSchedulerEx, 100000, 2000, 1);
} /// <summary>
/// 模拟大量网络请求任务 /// </summary>
private void button3\_Click(object sender, EventArgs e)
{
DoTask(\_taskSchedulerExSmall, 2000, 100, 20);
} /// <summary>
/// 模拟CPU密集型任务 /// </summary>
private void button4\_Click(object sender, EventArgs e)
{
DoTask(\_taskSchedulerExSmall, 2000, 100, 1);
} /// <summary>
/// 模拟任务 /// </summary>
/// <param name="scheduler">scheduler</param>
/// <param name="taskCount">任务数量</param>
/// <param name="logCount">每隔多少条数据打一个日志</param>
/// <param name="delay">模拟延迟或耗时(毫秒)</param>
private void DoTask(TaskSchedulerEx scheduler, int taskCount, int logCount, int delay)
{
\_task.Run(() \=> {
Log("开始");
DateTime dt \= DateTime.Now;
List<Task> taskList = new List<Task>(); for (int i = 1; i <= taskCount; i++)
{
Task task \= scheduler.Run((obj) => { var k = (int)obj;
Thread.Sleep(delay); //模拟延迟或耗时
if (k % logCount == 0)
{
Log("最大线程数:" + scheduler.MaxThreadCount + " 核心线程数:" + scheduler.CoreThreadCount + " 活跃线程数:" + scheduler.ActiveThreadCount.ToString().PadLeft(4, ' ') + " 处理数/总数:" + k + " / " + taskCount);
}
}, i, (ex) \=> {
Log(ex.Message);
});
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray()); double d = DateTime.Now.Subtract(dt).TotalSeconds;
Log("完成,耗时:" + d + "秒");
});
} private void Form1\_FormClosed(object sender, FormClosedEventArgs e)
{ if (\_taskSchedulerEx != null)
{
\_taskSchedulerEx.Dispose(); //释放资源
\_taskSchedulerEx = null;
}
}
}
}
View Code
测试截图: