0%

    有些情况下需要一个调度器专门来处理一些工作,如在网络处理程序中,当接收的数据后把信息存放到队列中,尽可能的更快地处理下一接收操作.而接收的数据信息则由调试器来处理,如数据分析,数据包处理等等工作.既然调度器负责处理工作,那最好给需要处理的工作制定一个规则,方便以后灵活扩展处理.

制定规则接口

1

2

3

4

5

public interface IWorkItem:IDisposable

{

void Execute();

}

其实接口制定很简单就是一个Execute方法,表示对该工作执行;那为什么还要实现IDisposable呢,目的就是为这工作提供一个释放操作描述,当此项工作完成会有一个释放行为处理相关事情.

调度器实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

public class Despatch:IDisposable

{

public Despatch()

{

System.Threading.Thread thread = new System.Threading.Thread(

new System.Threading.ThreadStart(Run));

thread.Start();

}

private Queue<IWorkItem> mQueues = new Queue<IWorkItem>(1024);

public void Add(IWorkItem item)

{

lock (mQueues)

{

mQueues.Enqueue(item);

}

}

public int Count

{

get

{

return mQueues.Count;

}

}

protected IWorkItem GetItem()

{

lock (mQueues)

{

if (mQueues.Count > 0)

return mQueues.Dequeue();

return null``;

}

}

protected virtual void OnRun()

{

IWorkItem item = GetItem();

if (item != null``)

{

using (item)

{

item.Execute();

}

}

else

System.Threading.Thread.Sleep(2);

}

public virtual void Run()

{

while (!mDispose)

{

OnRun();

}

}

private bool mDispose = false``;

public void Dispose()

{

lock (``this``)

{

if (!mDispose)

{

mDispose = true``;

}

}

}

}

调度器工作原来就是在构造的时候直接启动一个线程来运行Run方法;该方法的工作就是不停地从一个队列中获取一个工作项目并执行,当没有工作项的情况下把线程Sleep 2毫秒;为什么要Sleep呢,其目的就是不要让while在处理空队列的时候把cpu资源给吃光了.

下面实现一个简单的工作项

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

class ReceiveWorkItem : WorkItem

{

public override void Execute()

{

TcpChannel channel = Channel;

DataBuffer db = channel.ReceiveBuffer;

try

{

db.SetBuffer(SocketAsyncEventArgs.Buffer, SocketAsyncEventArgs.BytesTransferred);

channel.CallDataReceive(``new DataReceiveEventArgs() { Buffer = db, Channel = channel });

}

catch (Exception e_)

{

channel.CallChannelError(``new ChannelErrorEventArgs() { Channel = channel, Exception = e_ });

}

}

public override void Dispose()

{

SocketAsyncEventArgs.Enter();

}

}

以上的工作内容就是把接收的数据放到一个Buffer中,并调用CallDataReceive方法触发数据接收方法;工作完成后调用了SocketAsyncEventArgs.Enter()方法把该对象回收到池中.