0%

public class RabbitMQEventBus : BaseEventBus

{

private readonly IConnectionFactory connectionFactory;

private readonly IConnection connection;

private readonly IModel channel;

private readonly string exchangeName;

private readonly string exchangeType;

private readonly string queueName;

private readonly bool autoAck;

private readonly ILogger logger;

private bool disposed;

public RabbitMQEventBus(IConnectionFactory connectionFactory,

ILogger<RabbitMQEventBus> logger,

IEventHandlerExecutionContext context,

string exchangeName,

string exchangeType = ExchangeType.Fanout,

string queueName = null``,

bool autoAck = false``)

: base``(context)

{

this``.connectionFactory = connectionFactory;

this``.logger = logger;

this``.connection = this``.connectionFactory.CreateConnection();

this``.channel = this``.connection.CreateModel();

this``.exchangeType = exchangeType;

this``.exchangeName = exchangeName;

this``.autoAck = autoAck;

this``.channel.ExchangeDeclare(``this``.exchangeName, this``.exchangeType);

this``.queueName = this``.InitializeEventConsumer(queueName);

logger.LogInformation($``"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."``);

}

public override Task PublishAsync<TEvent>(TEvent @``event``, CancellationToken cancellationToken = default``(CancellationToken))

{

var json = JsonConvert.SerializeObject(@``event``, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });

var eventBody = Encoding.UTF8.GetBytes(json);

channel.BasicPublish(``this``.exchangeName,

@``event``.GetType().FullName,

null``,

eventBody);

return Task.CompletedTask;

}

public override void Subscribe<TEvent, TEventHandler>()

{

if (!``this``.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())

{

this``.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();

this``.channel.QueueBind(``this``.queueName, this``.exchangeName, typeof``(TEvent).FullName);

}

}

protected override void Dispose(``bool disposing)

{

if (!disposed)

{

if (disposing)

{

this``.channel.Dispose();

this``.connection.Dispose();

logger.LogInformation($``"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."``);

}

disposed = true``;

base``.Dispose(disposing);

}

}

private string InitializeEventConsumer(``string queue)

{

var localQueueName = queue;

if (``string``.IsNullOrEmpty(localQueueName))

{

localQueueName = this``.channel.QueueDeclare().QueueName;

}

else

{

this``.channel.QueueDeclare(localQueueName, true``, false``, false``, null``);

}

var consumer = new EventingBasicConsumer(``this``.channel);

consumer.Received += async (model, eventArgument) =>

{

var eventBody = eventArgument.Body;

var json = Encoding.UTF8.GetString(eventBody);

var @``event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });

await this``.eventHandlerExecutionContext.HandleEventAsync(@``event``);

if (!autoAck)

{

channel.BasicAck(eventArgument.DeliveryTag, false``);

}

};

this``.channel.BasicConsume(localQueueName, autoAck: this``.autoAck, consumer: consumer);

return localQueueName;

}

}

上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。

事实上,不仅仅是在事件处理器中,我们需要关心对象的生命周期,在整个ASP.NET Core Web API的应用程序里,我们需要理解并仔细推敲被注册到IoC容器中的服务,它们的生命周期应该是个怎样的情形,这也是服务端应用程序设计必须认真考虑的内容。因为如果生命周期管理不合理,程序申请的资源无法合理释放,最后便会带来内存泄漏、程序崩溃等各种问题,然而这样的问题对于服务端应用程序来说,是非常严重的。

记得在上一篇文章的结束部分,我给大家留下一个练习,就是让大家在CustomerCreatedEventHandler事件处理器的HandleAsync方法中,填入自己的代码,以便对获得的事件消息做进一步的处理。作为本文的引子,我们首先将这部分工作做完,然后再进一步分析生命周期的问题。

Event Store是CQRS体系结构模式中最为重要的一个组成部分,它的主要职责就是保存发生于领域模型中的领域事件,并对事件数据进行归档。当仓储需要获取领域模型对象时,Event Store也会配合快照数据库一起,根据领域事件的发生顺序,逐步回放并重塑领域模型对象。事实上,Event Store的实现是非常复杂的,虽然从它的职责上来看并不算太复杂,然而它所需要解决的事件同步、快照、性能、消息派发等问题,使得CQRS体系结构的实现变得非常复杂。在实际应用中,已经有一些比较成熟的框架和工具集,能够帮助我们在CQRS中很方便地实现Event Store,比如GetEventStore就是一个很好的开源Event Store框架,它是基于.NET开发的,在微软官方的eShopOnContainers说明文档中,也提到了这个框架,推荐大家上他们的官网(https://eventstore.org/)了解一下。在这里我们就先不深入研究Event Store应该如何实现,我们先做一个简单的Event Store,以便展示我们需要讨论的问题。

延续着上一版的代码库(https://github.com/daxnet/edasample/tree/chapter_1),我们首先在EdaSample.Common.Events命名空间下,定义一个IEventStore的接口,这个接口非常简单,仅仅包含一个保存事件的方法,代码如下:

1

2

3

4

5

public interface IEventStore : IDisposable

{

Task SaveEventAsync<TEvent>(TEvent @``event``)

where TEvent : IEvent;

}

SaveEventAsync方法仅有一个参数:由泛型类型TEvent绑定的@event对象。泛型约束表示SaveEventAsync方法仅能接受IEvent接口及其实现类型的对象作为参数传入。接口定义好了,下一步就是实现这个接口,对传入的事件对象进行保存。为了实现过程的简单,我们使用Dapper,将事件数据保存到SQL Server数据库中,来模拟Event Store对事件的保存操作。

Note:为什么IEventStore接口的SaveEventAsync方法签名中,没有CancellationToken参数?严格来说,支持async/await异步编程模型的方法定义上,是需要带上CancellationToken参数的,以便调用方请求取消操作的时候,方法内部可以根据情况对操作进行取消。然而有些情况下取消操作并不是那么合理,或者方法内部所使用的API并没有提供更深层的取消支持,因此也就没有必要在方法定义上增加CancellationToken参数。在此处,为了保证接口的简单,没有引入CancellationToken的参数。

接下来,我们实现这个接口,并用Dapper将事件数据保存到SQL Server中。出于框架设计的考虑,我们新建一个Net Standard Class Library项目,在这个新的项目中实现IEventStore接口,这么做的原因已经在上文中介绍过了。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

public class DapperEventStore : IEventStore

{

private readonly string connectionString;

public DapperEventStore(``string connectionString)

{

this``.connectionString = connectionString;

}

public async Task SaveEventAsync<TEvent>(TEvent @``event``) where TEvent : IEvent

{

const string sql = @"INSERT INTO [dbo].[Events]

([EventId], [EventPayload], [EventTimestamp])

VALUES

(@eventId, @eventPayload, @eventTimestamp)"``;

using (``var connection = new SqlConnection(``this``.connectionString))

{

await connection.ExecuteAsync(sql, new

{

eventId = @``event``.Id,

eventPayload = JsonConvert.SerializeObject(@``event``),

eventTimestamp = @``event``.Timestamp

});

}

}

#region IDisposable Support

#endregion

}

IDisposable接口的实现部分暂且省略,可以看到,实现还是非常简单的:通过构造函数传入数据库的连接字符串,在SaveEventAsyc方法中,基于SqlConnection对象执行Dapper的扩展方法来完成事件数据的保存。

Note: 此处使用了JsonConvert.SerializeObject方法来序列化事件对象,也就意味着DapperEventStore程序集需要依赖Newtonsoft.Json程序集。虽然在我们此处的案例中不会有什么影响,但这样做会造成DapperEventStore对Newtonsoft.Json的强依赖,这样的依赖关系不仅让DapperEventStore变得不可测试,而且Newtonsoft.Json将来未知的变化,也会影响到DapperEventStore,带来一些不确定性和维护性问题。更好的做法是,引入一个IMessageSerializer接口,在另一个新的程序集中使用Newtonsoft.Json来实现这个接口,同时仅让DapperEventStore依赖IMessageSerializer,并在应用程序启动时,将Newtonsoft.Json的实现注册到IoC容器中。此时,IMessageSerializer可以被Mock,DapperEventStore就变得可测试了;另一方面,由于只有那个新的程序集会依赖Newtonsoft.Json,因此,Newtonsoft.Json的变化也仅仅会影响那个新的程序集,不会对框架主体的其它部分造成任何影响。

EventStore实现好了,接下来,我们将其用在CustomerCreatedEventHandler中,以便将订阅的CustomerCreatedEvent保存下来。

保存事件数据的第一步,就是在ASP.NET Core Web API的IoC容器中,将DapperEventStore注册进去。这一步是非常简单的,只需要在Startup.cs的ConfigureServices方法中完成即可。代码如下:

1

2

3

4

5

6

7

8

public void ConfigureServices(IServiceCollection services)

{

services.AddMvc();

services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();

services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration[``"mssql:connectionString"``]));

services.AddSingleton<IEventBus, PassThroughEventBus>();

}

注意我们使用的是services.AddTransient方法来注册DapperEventStore,我们希望应用程序在每次请求IEventStore实例时,都能获得一个新的DapperEventStore的实例。

接下来,打开CustomerCreatedEventHandler.cs文件,在构造函数中加入对IEventStore的依赖,然后修改HandleAsync方法,在该方法中使用IEventStore的实例来完成事件数据的保存。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>

{

private readonly IEventStore eventStore;

public CustomerCreatedEventHandler(IEventStore eventStore)

{

this``.eventStore = eventStore;

}

public bool CanHandle(IEvent @``event``)

=> @``event``.GetType().Equals(``typeof``(CustomerCreatedEvent));

public async Task<``bool``> HandleAsync(CustomerCreatedEvent @``event``, CancellationToken cancellationToken = default``)

{

await this``.eventStore.SaveEventAsync(@``event``);

return true``;

}

public Task<``bool``> HandleAsync(IEvent @``event``, CancellationToken cancellationToken = default``)

=> CanHandle(@``event``) ? HandleAsync((CustomerCreatedEvent)@``event``, cancellationToken) : Task.FromResult(``false``);

}

OK,代码修改完毕,测试一下。

image

看看数据库中客户信息是否已经创建:

image

看看数据库中事件数据是否已经保存成功:

image

OK,数据全部保存成功。

然而,事情真的就这么简单么?No。在追踪了IEventStore实例(也就是DapperEventStore)的生命周期后,你会发现,问题没有想象的那么简单。

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped这些方法对服务进行注册时,使用不同的方法也就意味着选择了不同的对象生命周期。在此我们也不再深入讨论每种方法之间的差异,微软官方有详细的文档和demo(抱歉我没有贴出中文链接,因为机器翻译的缘故,实在有点不堪入目),如果对ASP.NET Core的IoC容器不熟悉的话,建议先了解一下官网文章的内容。在上面我稍微提了一下,我们是用AddTransient方法来注册DapperEventStore的,因为我们希望在每次使用IEventStore的时候,都会有一个新的DapperEventStore被创建。现在,让我们来验证一下,看情况是否果真如此。

日志的使用

追踪程序执行的最有效的方式就是使用日志。在我们的场景中,使用基于文件的日志会更合适,因为这样我们可以更清楚地看到程序的执行过程以及对象的变化过程。同样,我不打算详细介绍如何在ASP.NET Core Web API中使用日志,微软官网同样有着非常详尽的文档来介绍这些内容。在这里,我简要地将相关代码列出来,以介绍如何启用基于文件的日志系统。

首先,在Web API服务的项目上,添加对Serilog.Extensions.Logging.File的nuget包,使用它能够非常方便地启用基于文件的日志。然后,打开Program.cs文件,添加ConfigureLogging的调用:

1

2

3

4

5

6

7

8

public static IWebHost BuildWebHost(``string``[] args) =>

WebHost.CreateDefaultBuilder(args)

.ConfigureLogging((context, lb) =>

{

lb.AddFile(LogFileName);

})

.UseStartup<Startup>()

.Build();

此处LogFileName为本地文件系统中的日志文件文件名,为了避免权限问题,我将日志写入C:\Users\\appdata\local目录下,因为我的Web API进程是由当前登录用户启动的,所以写在这个目录下不会有权限问题。如果今后我们把Web API host在IIS中,那么启动IIS服务的用户需要对日志所在的目录具有写入的权限,日志文件才能被正确写入,这一点是需要注意的。

好了,现在可以使用日志了,先试试看。在Startup类的构造函数中,加入ILoggerFactory参数,并在构造函数执行时获取ILogger实例,然后在ConfigureServices调用中输出一些内容:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

public class Startup

{

private readonly ILogger logger;

public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)

{

Configuration = configuration;

this``.logger = loggerFactory.CreateLogger<Startup>();

}

public IConfiguration Configuration { get``; }

public void ConfigureServices(IServiceCollection services)

{

this``.logger.LogInformation(``"正在对服务进行配置..."``);

services.AddMvc();

services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();

services.AddTransient<IEventStore>(serviceProvider =>

new DapperEventStore(Configuration[``"mssql:connectionString"``]));

services.AddSingleton<IEventBus, PassThroughEventBus>();

this``.logger.LogInformation(``"服务配置完成,已注册到IoC容器!"``);

}

}

现在重新启动服务,然后查看日志文件,发现日志可以被正确输出:

image

接下来,使用类似的方式,向PassThroughEventBus的构造函数和Dispose方法中加入一些日志输出,在CustomersController的Create方法中、CustomerCreatedEventHandler的构造函数和HandleAsync方法中、DapperEventStore的构造函数和Dispose方法中也加入一些日志输出,以便能够观察当新的客户信息被创建时,Web API的执行过程。限于文章篇幅,就不在此一一贴出各方法中加入日志输出的代码了,大家可以根据本文最后所提供的源代码链接来获取源代码。简单地举个例子吧,比如对于DapperEventStore,我们通过构造函数注入ILogger的实例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public class DapperEventStore : IEventStore

{

private readonly string connectionString;

private readonly ILogger logger;

public DapperEventStore(``string connectionString,

ILogger<DapperEventStore> logger)

{

this``.connectionString = connectionString;

this``.logger = logger;

logger.LogInformation($``"DapperEventStore构造函数调用完成。Hash Code:{this.GetHashCode()}."``);

}

}

这样一来,在DapperEventStore的其它方法中,就可以通过logger来输出日志了。

发现问题

同样,再次运行Web API,并通过Powershell发起一次创建客户信息的请求,然后打开日志文件,整个程序的执行过程基本上就一目了然了:

image

从上面的日志内容可以得知,当应用程序正常退出时,由IoC容器托管的PassThroughEventBus和DapperEventStore都能够被正常Dispose,目前看来没什么问题,因为资源可以正常释放。现在让我们重新启动Web API,连续发送两次创建客户信息的请求,再次查看日志,我们得到了下面的内容:

image

从上面的日志内容可以看到,在Web API的整个运行期间,CustomerCreatedEventHandler仅被构造了一次,而且在每次处理CustomerCreatedEvent事件的时候,都是使用同一个DapperEventStore实例来保存事件数据。也就是说,CustomerCreatedEventHandler和DapperEventStore在整个Web API服务的生命周期中,有且仅有一个实例,它们是Singleton的!然而,在进行系统架构的时候,我们应该尽量保证较短的对象生命周期,以免因为状态的不一致性导致不可回滚的错误出现,这也是架构设计中的一种最佳实践。虽然目前我们的DapperEventStore在程序正常退出的时候能够被Dispose掉,但如果DapperEventStore使用了非托管资源,并且非托管资源并没有很好地管理自己的内存呢?久而久之,DapperEventStore就产生了内存泄漏点,慢慢地,Web API就会出现内存泄漏,系统资源将被耗尽。假如Web API被部署在云中,应用程序监控装置(比如AWS的Cloud Watch)就会持续报警,并强制服务断线,整个系统的可用性就无法得到保障。所以,我们更期望DapperEventStore能够正确地实现C#的Dispose模式,在Dispose方法中合理地释放资源,并且仅在需要使用DapperEventStore时候才去构建它,用完就及时Dispose,以保证资源的合理使用。这也就是为什么我们使用services.AddTransient方法来注册CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事实却并非如此。究其原因,就是因为PassThroughEventBus是单例实例,它的生命周期是整个Web API服务。而在PassThroughEventBus的构造函数中,CustomerCreatedEventHandler被作为参数传入,于是,PassThroughEventBus产生了对CustomerCreatedEventHandler的依赖,而连带地也产生了对DapperEventStore的依赖。换句话说,在整个应用程序运行的过程中,IoC框架完全没有理由再去创建新的CustomerCreatedEventHandler以及DapperEventStore的实例,因为事件处理器作为强引用被注册到PassThroughEventBus中,而PassThroughEventBus至始至终没有变过!

Note:为什么PassThroughEventBus可以作为单例注册到IoC容器中?因为它提供了无状态的全局性的基础结构层服务:事件总线。在PassThroughEventBus的实现中,这种全局性体现得不明显,我们当然可以每一次HTTP请求都创建一个新的PassThroughEventBus来转发事件消息并作处理。然而,在今后我们要实现的基于RabbitMQ的事件总线中,如果我们还是每次HTTP请求都创建一个新的消息队列,不仅性能得不到保证,而且消息并不能路由到新创建的channel上。注意:我们将其注册成单例,一个很重要的依据是由于它是无状态的,但即使如此,我们也要注意在应用程序退出的时候,合理Dispose掉它所占用的资源。当然,在这里,ASP.NET Core的IoC机制会帮我们解决这个问题(因为我注册了PassThroughEventBus,但我没有显式调用Dispose方法,我仍然能从日志中看到“PassThroughEventBus已经被Dispose”的字样),然而有些情况下,ASP.NET Core不会帮我们做这些,就需要我们自己手工完成。

OMG!由于构造函数注入,使得对象之间产生了依赖关系,从而影响到了它们的生命周期,这可怎么办?既然问题是由依赖引起的,那么就需要想办法解耦。

解耦!解决事件处理器对象生命周期问题

经过分析,我们需要解除PassThroughEventBus对各种EventHandler的直接依赖。因为PassThroughEventBus是单例的,那么由它引用的所有组件也只可能具有相同的生命周期。然而,这样的解耦又该如何做呢?将EventHandler封装到另一个类中?结果还是一样,PassThroughEventBus总会通过某种对象关系,来间接引用到EventHandler上,造成EventHandler全局唯一。

或许,应该要有另一套生命周期管理体系来管理EventHandler的生命周期,使得每当PassThroughEventBus需要使用EventHandler对所订阅的事件进行处理的时候,都会通过这套体系来请求新的EventHandler实例,这样一来,PassThroughEventBus也就不再依赖于某个特定的实例了,而仅仅是引用了各种EventHandler在新的生命周期管理体系中的注册信息。每当需要的时候,PassThroughEventBus都会将事件处理器的注册信息传给新的管理体系,然后由这套新的体系来维护事件处理器的生命周期。

通过阅读微软官方的eShopOnContainers案例代码后,证实了这一想法。在案例中,有如下代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

private async Task ProcessEvent(``string eventName, string message)

{

if (_subsManager.HasSubscriptionsForEvent(eventName))

{

using (``var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))

{

var subscriptions = _subsManager.GetHandlersForEvent(eventName);

foreach (``var subscription in subscriptions)

{

if (subscription.IsDynamic)

{

var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;

dynamic eventData = JObject.Parse(message);

await handler.Handle(eventData);

}

else

{

var eventType = _subsManager.GetEventTypeByName(eventName);

var integrationEvent = JsonConvert.DeserializeObject(message, eventType);

var handler = scope.ResolveOptional(subscription.HandlerType);

var concreteType = typeof``(IIntegrationEventHandler<>).MakeGenericType(eventType);

await (Task)concreteType.GetMethod(``"Handle"``).Invoke(handler, new object``[] { integrationEvent });

}

}

}

}

}

可以看到,高亮的这一行,通过Autofac创建了一个新的LifetimeScope,在这个Scope中,通过eventName来获得一个subscription对象(也就是EventHandler的注册信息),进而通过scope的ResolveOptional调用来获得新的EventHandler实例。基本过程就是这样,目前也不需要纠结IDynamicIntegrationEventHandler是干什么用的,也不需要纠结为什么要使用dynamic来保存事件数据。重点是,autofac的BeginLifetimeScope方法调用创建了一个新的IoC Scope,在这个Scope中解析(resolve)了新的EventHandler实例。在eShopOnContainer案例中,EventBusRabbitMQ的设计是特定的,必须依赖于Autofac作为依赖注入框架。或许这部分设计可以进一步改善,使得EventBusRabbitMQ不会强依赖于Autofac。

接下来,我们会引入一个新的概念:事件处理器执行上下文,使用类似的方式来解决对象生命周期问题。

事件处理器执行上下文(Event Handler Execution Context, EHEC)为事件处理器提供了一个完整的生命周期管理机制,在这套机制中,事件处理器及其引用的对象资源可以被正常创建和正常销毁。现在让我们一起看看,如何在EdaSample的案例代码中使用事件处理器执行上下文。

事件处理器执行上下文的接口定义如下,当然,这部分接口是放在EdaSample.Common.Events目录下,作为消息系统的框架代码提供给调用方:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

public interface IEventHandlerExecutionContext

{

void RegisterHandler<TEvent, THandler>()

where TEvent : IEvent

where THandler : IEventHandler<TEvent>;

void RegisterHandler(Type eventType, Type handlerType);

bool HandlerRegistered<TEvent, THandler>()

where TEvent : IEvent

where THandler : IEventHandler<TEvent>;

bool HandlerRegistered(Type eventType, Type handlerType);

Task HandleEventAsync(IEvent @``event``, CancellationToken cancellationToken = default``);

}

这个接口主要包含三种方法:注册事件处理器、判断事件处理器是否已经注册,以及对接收到的事件消息进行处理。整个结构还是非常清晰简单的。现在需要实现这个接口。根据上面的分析,这个接口的实现是需要依赖于IoC容器的,目前简单起见,我们仅使用微软ASP.NET Core标准的Dependency Injection框架来实现,当然,也可以使用Autofac,取决于你怎样去实现上面这个接口。需要注意的是,由于该接口的实现是需要依赖于第三方组件的(在这里是微软的Dependency Injection框架),因此,最佳做法是新建一个类库,并引用EdaSample.Common程序集,并在这个新的类库中,依赖Dependency Injection框架来实现这个接口。

以下是基于Microsoft.Extensions.DependencyInjection框架来实现的事件处理器执行上下文完整代码,这里有个兼容性问题,就是构造函数的第二个参数:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回类型是IServiceProvider,但从2.0开始,它的返回类型已经从IServiceProvider接口,变成了ServiceProvider类。这里引出了框架设计的另一个原则,就是依赖较低版本的.NET Core,以便获得更好的兼容性。如果我们的EdaSample是使用.NET Core 1.1开发的,那么当下面这个类被直接用在ASP.NET Core 2.0的项目中时,如果不通过构造函数参数传入ServiceProvider创建委托,而是直接在代码中使用registry.BuildServiceProvider调用,就会出现异常。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

public class EventHandlerExecutionContext : IEventHandlerExecutionContext

{

private readonly IServiceCollection registry;

private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;

private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>();

public EventHandlerExecutionContext(IServiceCollection registry,

Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null``)

{

this``.registry = registry;

this``.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());

}

public async Task HandleEventAsync(IEvent @``event``, CancellationToken cancellationToken = default``(CancellationToken))

{

var eventType = @``event``.GetType();

if (``this``.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&

handlerTypes?.Count > 0)

{

var serviceProvider = this``.serviceProviderFactory(``this``.registry);

using (``var childScope = serviceProvider.CreateScope())

{

foreach``(``var handlerType in handlerTypes)

{

var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);

if (handler.CanHandle(@``event``))

{

await handler.HandleAsync(@``event``, cancellationToken);

}

}

}

}

}

public bool HandlerRegistered<TEvent, THandler>()

where TEvent : IEvent

where THandler : IEventHandler<TEvent>

=> this``.HandlerRegistered(``typeof``(TEvent), typeof``(THandler));

public bool HandlerRegistered(Type eventType, Type handlerType)

{

if (``this``.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))

{

return handlerTypeList != null && handlerTypeList.Contains(handlerType);

}

return false``;

}

public void RegisterHandler<TEvent, THandler>()

where TEvent : IEvent

where THandler : IEventHandler<TEvent>

=> this``.RegisterHandler(``typeof``(TEvent), typeof``(THandler));

public void RegisterHandler(Type eventType, Type handlerType)

{

Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this``.registrations);

this``.registry.AddTransient(handlerType);

}

}

好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

public interface IEventSubscriber : IDisposable

{

void Subscribe<TEvent, TEventHandler>()

where TEvent : IEvent

where TEventHandler : IEventHandler<TEvent>;

}

public sealed class PassThroughEventBus : IEventBus

{

private readonly EventQueue eventQueue = new EventQueue();

private readonly ILogger logger;

private readonly IEventHandlerExecutionContext context;

public PassThroughEventBus(IEventHandlerExecutionContext context,

ILogger<PassThroughEventBus> logger)

{

this``.context = context;

this``.logger = logger;

logger.LogInformation($``"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."``);

eventQueue.EventPushed += EventQueue_EventPushed;

}

private async void EventQueue_EventPushed(``object sender, EventProcessedEventArgs e)

=> await this``.context.HandleEventAsync(e.Event);

public Task PublishAsync<TEvent>(TEvent @``event``, CancellationToken cancellationToken = default``)

where TEvent : IEvent

=> Task.Factory.StartNew(() => eventQueue.Push(@``event``));

public void Subscribe<TEvent, TEventHandler>()

where TEvent : IEvent

where TEventHandler : IEventHandler<TEvent>

{

if (!``this``.context.HandlerRegistered<TEvent, TEventHandler>())

{

this``.context.RegisterHandler<TEvent, TEventHandler>();

}

}

#region IDisposable Support

private bool disposedValue = false``;

void Dispose(``bool disposing)

{

if (!disposedValue)

{

if (disposing)

{

this``.eventQueue.EventPushed -= EventQueue_EventPushed;

logger.LogInformation($``"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."``);

}

disposedValue = true``;

}

}

public void Dispose() => Dispose(``true``);

#endregion

}

public void ConfigureServices(IServiceCollection services)

{

this``.logger.LogInformation(``"正在对服务进行配置..."``);

services.AddMvc();

services.AddTransient<IEventStore>(serviceProvider =>

new DapperEventStore(Configuration[``"mssql:connectionString"``],

serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));

var eventHandlerExecutionContext = new EventHandlerExecutionContext(services,

sc => sc.BuildServiceProvider());

services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);

services.AddSingleton<IEventBus, PassThroughEventBus>();

this``.logger.LogInformation(``"服务配置完成,已注册到IoC容器!"``);

}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)

{

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>();

if (env.IsDevelopment())

{

app.UseDeveloperExceptionPage();

}

app.UseMvc();

}

代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:

image

本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_2这个tag,如下:

image

在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,我打算首先介绍一下CQRS架构下相关部分的实现,然后再继续讨论事件驱动型架构实现的具体问题。

当然,CQRS架构本身的实现也是根据实际情况的不同,需要具体问题具体分析的,不仅如此,CQRS架构的实现也是非常复杂的,绝不是一套文章一套案例能够解释清楚并涵盖全部的。所以,我不会把大部分篇幅放在CQRS架构实现的细节上,而是会着重介绍与我们的主题相关的内容,并对无关的内容进行弱化。或许,在这个系列文章结束的时候,我们会得到一个完整的、能够运行的CQRS架构系统,不过,这套系统极有可能仅供技术研讨和学习使用,无法直接用于生产环境。

基于这样的前提,我们今天首先看一下CQRS架构中聚合与聚合根的实现,或许你会觉得目前讨论的内容与你本打算关心的事件驱动架构没什么关系,而事实是,CQRS架构中聚合与聚合根的实现是完全面向事件驱动的,而这部分内容也会为我们之后的讨论做下铺垫。不仅如此,我还会在本文讨论一些基于.NET/C#的软件架构设计的思考与实践(请注意文章中我添加了Note字样并且字体加粗的句子),因此,我还是会推荐你继续读完这篇文章。

早在2010年,我针对CQRS架构总结过一篇文章,题目是:《EntityFramework之领域驱动设计实践【扩展阅读】:CQRS体系结构模式》,当然,这篇文章跟Entity Framework本没啥关系,只是延续了领域驱动设计这一话题进行的扩展讨论罢了。这篇文章介绍了CQRS架构模式所产生的背景、结构,以及相关的一些概念,比如:最近非常流行的词语:“事件溯源”、解决事件溯源性能问题的“快照”、用于存取事件数据的“事件存储(Event Store)”,还有重新认识了什么叫做“对象的状态”,等等。此外,在后续的博文中,我也经常对CQRS架构中的实现细节做些探讨,有兴趣的读者可以翻看我过去的博客文章。总体上讲,CQRS架构基本符合下图所描述的结构:

image

看上去是不是特别复杂?没错,特别复杂,而且每个部分都可以使用不同的工具、框架,以不同的形式进行实现。整个架构甚至可以是语言、平台异构的,还可以跟外部系统进行整合,实现大数据分析、呈现等等,玩法可谓之五花八门,这些统统都不在我们的讨论范围之内。我们今天打算讨论的,就是上图右上部分“领域模型”框框里的主题:CQRS架构中的聚合与聚合根。

说到聚合与聚合根,了解过领域驱动设计(DDD)的读者肯定对这两个概念非常熟悉。通常情况下,具有相同生命周期,组合起来能够共同表述一种领域概念的一组模型对象,就可以组成一个聚合。在每个聚合中,衔接各个领域模型对象,并向外提供统一访问聚合的对象,就是聚合根。聚合中的所有对象,离开聚合根,就不能完整地表述一个领域概念。比如:收货地址无法离开客户,订单详情无法离开订单,库存无法离开货品等等。所以从定义上来看,一个聚合大概就是这样:

  • 聚合中的对象可以是实体,也可以是值对象
  • 聚合中所有对象具有相同的生命周期
  • 外界通过聚合根访问整个聚合,聚合根通过导航属性(Navigation Properties)进而访问聚合中的其它实体和值对象
  • 通过以上两点,可以得出:工厂和仓储必须针对聚合根进行操作
  • 聚合根是一个实体
  • 聚合中的对象是有状态的,通常会通过C#的属性(Properties)将状态曝露给外界

好吧,对这些概念比较熟悉的读者来说,我在此算是多啰嗦了几句。接下来,让我们结合CQRS架构中命令处理器对领域模型的更改过程来看看,除了以上这些常规特征之外,聚合与聚合根还有哪些特殊之处。当命令处理器接到操作命令时,便开始对领域模型进行更改,步骤如下:

  1. 首先,命令处理器通过仓储,获取具有指定ID值的聚合(聚合的ID值就是聚合根的ID值)
  2. 然后,仓储访问事件存储数据库,根据需要获取的聚合根的类型,以及ID值,获取所有关联的领域事件
  3. 其次,仓储构造聚合对象实例,并依照一定的顺序,逐一将领域事件重新应用在新构建的聚合上
  4. 每当有一个领域事件被应用在聚合上时,聚合本身的内联事件处理器会捕获这个领域事件,并根据领域事件中的数据,设置聚合中对象的状态
  5. 当所有的领域事件全部应用在聚合上时,聚合的状态就是曾经被保存时的状态
  6. 然后,仓储将已经恢复了状态的聚合返回给命令处理器,命令处理器调用聚合上的方法,对聚合进行更改
  7. 在调用方法的时候,方法本身会产生一个领域事件,这个领域事件会立刻被聚合本身的内联事件处理器捕获,并进行处理。在处理的过程中,会更新聚合中对象的状态,同时,这个领域事件还会被缓存在聚合中
  8. 命令处理器在完成对聚合的更改之后,便会调用仓储,将更改后的模型保存下来
  9. 接着,仓储从聚合中获得所有缓存的未曾保存的领域事件,并将所有这些领域事件逐个保存到事件存储数据库。在成功完成保存之后,会清空聚合中的事件缓存
  10. 最后,仓储将所有的这些领域事件逐个地派发到事件消息总线

接下来在事件消息总线和事件处理器中将会发生的事情,我们今后还会讨论,这里就不多说了。从这个过程,我们不难得出:

  • CQRS的聚合中,更改对象状态必须通过领域事件,也就是说,不能向外界曝露直接访问对象状态的接口,更通俗地说,表示对象状态的属性(Property)不能有设置器(Setter)
  • CQRS聚合的聚合根中,会有一套内联的领域事件处理机制,用来捕获并处理聚合中产生的领域事件
  • CQRS聚合的聚合根会有一个保存未提交领域事件的本地缓存,对该缓存的访问应该是线程安全的
  • CQRS的聚合需要能够向仓储提供必要的接口,比如清除事件缓存的方法等
  • 此外,CQRS聚合是有版本号的,版本号通常是一个64位整型,表述历史上发生在聚合上的领域事件一共有多少个。当然,这个值在我们目前的讨论中并非能够真正用得上,但是,在仓储重建聚合需要依赖快照时,这个版本号就非常重要了。我会在后续文章中介绍

听起来是不是非常复杂?确实如此。那我们就先从领域事件入手,逐步实现CQRS中的聚合与聚合根。

领域事件,顾名思义,就是从领域模型中产生的事件消息。概念上很简单,比如,客户登录网站,就会由客户登录实体产生一个事件派发出去,例如CustomerLoggedOnEvent,表示客户登录这件事已经发生了。虽然在DDD的实践中,领域事件更多地在CQRS架构中被讨论,其实即便是非事件驱动型架构,也可以通过领域模型来发布消息,达到系统解耦的目的。

延续之前的设计,我们的领域事件继承了IEvent接口,并增加了三个属性/方法,此外,为了编程方便,我们实现了领域事件的抽象类,UML类图如下:

image

图中的绿色部分就是在之前我们的事件模型上新加的接口和类,用以表述领域事件的概念。其中:

  • aggregateRootId:发生该领域事件的聚合的聚合根的ID值
  • aggregateRootType:发生该领域事件的聚合的聚合根的类型
  • sequence:该领域事件的序列号

好了,如果说我们将发生在某聚合上的领域事件保存到关系型数据库,那么,当需要获得该聚合的所有领域事件时,只需要下面一句SQL就行了:

1

SELECT * FROM [Events] WHERE [AggregateRootId]=aggregateRootId AND [AggregateRootType]=aggregateRootType ORDER BY [``Sequence``] ASC

这就是最简单的事件存储数据库的实现了。不过,我们暂时不介绍这些内容。

事实上,与标准的事件(IEvent接口)相比,除了上面三个主要的属性之外,领域事件还可以包含更多的属性和方法,这就要看具体的需求和设计了。不过目前为止,我们定义这三个属性已经够用了,不要把问题搞得太复杂。

有了领域事件的基本模型,我们开始设计CQRS下的聚合。

由于外界访问聚合都是通过聚合根来实现的,因此,针对聚合的操作都会被委托给聚合根来处理。比如,当用户地址发生变化时,服务层会调用Customer.ChangeAddress方法,这个方法就会产生一个领域事件,并通过内联的事件处理机制更改聚合中Address值对象中的状态。于是,从技术角度,聚合的设计也就是聚合根的实现。

接口与类之间的关系

首先需要设计的是与聚合相关的概念所表述的接口、类及其之间的关系。结合领域驱动设计中的概念,我们得到下面的设计:

image

其中,实体(IEntity)、聚合根(IAggregateRoot)都是大家耳熟能详的领域驱动设计的概念。由于实体都是通过Id进行唯一标识,所以,IEntity会有一个id的属性,为了简单起见,我们使用Guid作为它的类型。聚合根(IAggregateRoot)继承于IEntity接口,有趣的是,在我们目前的场景中,IAggregateRoot并不包含任何成员,它仅仅是一个空接口,在整个框架代码中,它仅作为泛型的类型约束。Note:这种做法其实也是非常常见的一种框架设计模式。具有事件溯源能力的聚合根(IAggregateRootWithEventSourcing)又继承于IAggregateRoot接口,并且有如下三个成员:

  • uncommittedEvents:用于缓存发生在当前聚合中的领域事件
  • version:表示当前聚合的版本号
  • Replay:将指定的一系列领域事件“应用”到当前的聚合上,也就是所谓的事件回放

此外,你还发现我们还有两个神奇的接口:IPurgable和IPersistedVersionSetter。这两个接口的职责是:

  • IPurgable表示,实现了该接口的类型具有某种清空操作,比如清空某个队列,或者将对象状态恢复到初始状态。让IAggregateRootWithEventSourcing继承于该接口是因为,当仓储完成了聚合中领域事件的保存和派发之后,需要清空聚合中缓存的事件,以保证在今后,发生在同一时间点的同样的事件不会被再次保存和派发
  • IPersistedVersionSetter接口允许调用者对聚合的“保存版本号”进行设置。这个版本号表示了在事件存储中,属于当前聚合的所有事件的个数。试想,如果一个聚合的“保存版本号”为4(即在事件存储中有4个事件是属于该聚合的),那么,如果再有2个事件发生在这个聚合中,于是,该聚合的版本就是4+2=6.

Note:为什么不将这两个接口中的方法直接放在IAggregateRootWithEventSourcing中呢?是因为单一职责原则。聚合本身不应该存在所谓之“清空缓存”或者“设置保存版本号”这样的概念,这样的概念对于技术人员来说比较容易理解,可是如果将这些技术细节加入领域模型中,就会污染领域模型,造成领域专家无法理解领域模型,这是违背面向对象分析与设计的单一职责原则的,也违背了领域驱动设计的原则。那么,即使把这些方法通过额外的接口独立出去,实现了IAggregateRootWithEventSourcing接口的类型,不还是要实现这两个接口中的方法吗?这样,聚合的访问者不还是可以访问这两个额外的方法吗?的确如此,这些接口是需要被实现的,但是我们可以使用C#中接口的显式实现,这样的话,如果不将IAggregateRootWithEventSourcing强制转换成IPurgable或者IPersistedVersionSetter的话,是无法直接通过聚合根对象本身来访问这些方法的,这起到了非常好的保护作用。接口的显式实现在软件系统的框架设计中也是常用手段。

抽象类AggregateRootWithEventSourcing的实现

在上面的类图中,IAggregateRootWithEventSourcing最终由AggregateRootWithEventSourcing抽象类实现。不要抱怨类的名字太长,它有助于我们理解这一类型在我们的领域模型中的角色和功能。下面的代码列出了该抽象类的主要部分的实现:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

public abstract class AggregateRootWithEventSourcing : IAggregateRootWithEventSourcing

{

private readonly Lazy<Dictionary<``string``, MethodInfo>> registeredHandlers;

private readonly Queue<IDomainEvent> uncommittedEvents = new Queue<IDomainEvent>();

private Guid id;

private long persistedVersion = 0;

private object sync = new object``();

protected AggregateRootWithEventSourcing()

: this``(Guid.NewGuid())

{ }

protected AggregateRootWithEventSourcing(Guid id)

{

registeredHandlers = new Lazy<Dictionary<``string``, MethodInfo>>(() =>

{

var registry = new Dictionary<``string``, MethodInfo>();

var methodInfoList = from mi in this``.GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)

let returnType = mi.ReturnType

let parameters = mi.GetParameters()

where mi.IsDefined(``typeof``(HandlesInlineAttribute), false``) &&

returnType == typeof``(``void``) &&

parameters.Length == 1 &&

typeof``(IDomainEvent).IsAssignableFrom(parameters[0].ParameterType)

select new { EventName = parameters[0].ParameterType.FullName, MethodInfo = mi };

foreach (``var methodInfo in methodInfoList)

{

registry.Add(methodInfo.EventName, methodInfo.MethodInfo);

}

return registry;

});

Raise(``new AggregateCreatedEvent(id));

}

public Guid Id => id;

long IPersistedVersionSetter.PersistedVersion { set => Interlocked.Exchange(``ref this``.persistedVersion, value); }

public IEnumerable<IDomainEvent> UncommittedEvents => uncommittedEvents;

public long Version => this``.uncommittedEvents.Count + this``.persistedVersion;

void IPurgable.Purge()

{

lock (sync)

{

uncommittedEvents.Clear();

}

}

public void Replay(IEnumerable<IDomainEvent> events)

{

((IPurgable)``this``).Purge();

events.OrderBy(e => e.Timestamp)

.ToList()

.ForEach(e =>

{

HandleEvent(e);

Interlocked.Increment(``ref this``.persistedVersion);

});

}

[HandlesInline]

protected void OnAggregateCreated(AggregateCreatedEvent @``event``)

{

this``.id = @``event``.NewId;

}

protected void Raise<TDomainEvent>(TDomainEvent domainEvent)

where TDomainEvent : IDomainEvent

{

lock (sync)

{

this``.HandleEvent(domainEvent);

domainEvent.AggregateRootId = this``.id;

domainEvent.AggregateRootType = this``.GetType().AssemblyQualifiedName;

domainEvent.Sequence = this``.Version + 1;

this``.uncommittedEvents.Enqueue(domainEvent);

}

}

private void HandleEvent<TDomainEvent>(TDomainEvent domainEvent)

where TDomainEvent : IDomainEvent

{

var key = domainEvent.GetType().FullName;

if (registeredHandlers.Value.ContainsKey(key))

{

registeredHandlers.Value[key].Invoke(``this``, new object``[] { domainEvent });

}

}

}

上面的代码不算复杂,它根据上面的分析和描述,实现了IAggregateRootWithEventSourcing接口,篇幅原因,就不多做解释了,不过有几点还是可以鉴赏一下的:

  1. 使用Lazy类型来保证领域事件处理器的容器在整个聚合生命周期中只初始化一次
  2. 通过lock语句和Interlocked.Exchange来保证类型的线程安全和数值的原子操作
  3. 聚合根被构造的时候,会找到当前类型中所有标记了HandlesInlineAttribute特性,并具有一定特征的函数,将它们作为领域事件的内联处理器,注册到容器中
  4. 每当聚合中的某个业务操作(方法)需要更改聚合中的状态时,就调用Raise方法来产生领域事件,由对应的内联处理器捕获领域事件,并在处理器方法中设置聚合的状态
  5. Replay方法会遍历所有给点的领域事件,调用HandleEvent方法,实现事件回放

现在,我们已经实现了CQRS架构下的聚合与聚合根,虽然实际上这个结构有可能比我们的实现更为复杂,但是目前的这个设计已经能够满足我们进一步研究讨论的需求了。下面,我们再更进一步,看看CQRS中仓储应该如何实现。

为什么说是“初探”?因为我们目前打算实现的仓储暂时不包含事件派发的逻辑,这部分内容我会在后续文章中讲解。首先看看,仓储的接口是什么样的。在CQRS架构中,仓储只具备两种操作:

  1. 保存聚合
  2. 根据聚合ID(也就是聚合根的ID)值,获取聚合对象

你或许会问,那根据某个条件查询满足该条件的所有聚合对象呢?注意,这是CQRS架构中查询部分的职责,不属于我们的讨论范围。

通常,仓储的接口定义如下:

1

2

3

4

5

6

7

8

public interface IRepository

{

Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot)

where TAggregateRoot : class``, IAggregateRootWithEventSourcing;

Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id)

where TAggregateRoot : class``, IAggregateRootWithEventSourcing;

}

与之前领域事件的设计类似,我们为仓储定义一个抽象类,所有仓储的实现都应该基于这个抽象类:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

public abstract class Repository : IRepository

{

protected Repository()

{ }

public async Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id)

where TAggregateRoot : class``, IAggregateRootWithEventSourcing

{

var domainEvents = await LoadDomainEventsAsync(``typeof``(TAggregateRoot), id);

var aggregateRoot = ActivateAggregateRoot<TAggregateRoot>();

aggregateRoot.Replay(domainEvents);

return aggregateRoot;

}

public async Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot)

where TAggregateRoot : class``, IAggregateRootWithEventSourcing

{

var domainEvents = aggregateRoot.UncommittedEvents;

await this``.PersistDomainEventsAsync(domainEvents);

aggregateRoot.PersistedVersion = aggregateRoot.Version;

aggregateRoot.Purge();

}

protected abstract Task<IEnumerable<IDomainEvent>> LoadDomainEventsAsync(Type aggregateRootType, Guid id);

protected abstract Task PersistDomainEventsAsync(IEnumerable<IDomainEvent> domainEvents);

private TAggregateRoot ActivateAggregateRoot<TAggregateRoot>()

where TAggregateRoot : class``, IAggregateRootWithEventSourcing

{

var constructors = from ctor in typeof``(TAggregateRoot).GetTypeInfo().GetConstructors()

let parameters = ctor.GetParameters()

where parameters.Length == 0 ||

(parameters.Length == 1 && parameters[0].ParameterType == typeof``(Guid))

select new { ConstructorInfo = ctor, ParameterCount = parameters.Length };

if (constructors.Count() > 0)

{

TAggregateRoot aggregateRoot;

var constructorDefinition = constructors.First();

if (constructorDefinition.ParameterCount == 0)

{

aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(``null``);

}

else

{

aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(``new object``[] { Guid.NewGuid() });

}

aggregateRoot.Purge();

return aggregateRoot;

}

return null``;

}

}

代码也是非常简单、容易理解的:GetByIdAsync方法根据给定的聚合根类型以及ID值,从后台存储中读取所有属于该聚合的领域事件,并在聚合上进行回放,以便将聚合恢复到存储前的状态;SaveAsync方法则从聚合根上获得所有未被提交的领域事件,将这些事件保存到后台存储,然后设置聚合的“已保存版本”,最后清空未提交事件的缓存。剩下的就是如何实现LoadDomainEventsAsync以及PersistDomainEventsAsync两个方法了。而这两个方法,原本就应该是事件存储对象的职责范围了。

Note:你也许会问:如果某个聚合从开始到现在,已经发生了大量的领域事件了,那么这样一条条地将事件回放到聚合上,岂不是性能非常低下?没错,这个问题我们可以通过快照来解决。在后续文章中我会介绍。你还会问:日积月累,事件存储系统中的事件数量岂不是会越来越多吗?需要删除吗?答案是:不删!不过可以对数据进行归档,或者依赖一些第三方框架来处理这个问题,但是,从领域驱动设计的角度,领域事件代表着整个领域模型系统中发生过的所有事情,事情既然已经发生,就无法再被抹去,因此,删除事件存储系统中的事件是不合理的。那数据量越来越大怎么办?答案是:或许,存储硬件设备要比业务数据更便宜。

仓储的实现我们暂且探索到这一步,目前我们只需要有一个正确的聚合保存、读取(通过领域事件重塑)的逻辑就可以了,并不需要关心事件本身是如何被读取被保存的。接下来,我们在.NET Core的测试项目中,借助Moq框架,通过Mock一个假想的仓储,来验证整个系统从聚合、聚合根的实现到仓储设计的正确性。

Moq是一个很好的Mock框架,简单轻量,而且支持.NET Core,在单元测试的项目中使用Moq是一种很好的实践。Moq上手非常简单,只需要在单元测试项目上添加Moq的NuGet依赖包就可以开始着手编写测试用例了。为了测试我们的聚合根以及仓储对聚合根保存、读取的设计,首先我们定义一个简单的聚合:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public class Book : AggregateRootWithEventSourcing

{

public void ChangeTitle(``string newTitle)

{

this``.Raise(``new BookTitleChangedEvent(newTitle));

}

public string Title { get``; private set``; }

[HandlesInline]

private void OnTitleChanged(BookTitleChangedEvent @``event``)

{

this``.Title = @``event``.NewTitle;

}

public override string ToString()

{

return Title;

}

}

Book类是一个聚合根,它继承AggregateRootWithEventSourcing抽象类,同时它有一个属性,Title,表示书的名称,而ChangeTitle方法(业务方法)会直接产生一个BookTitleChangedEvent领域事件,之后,OnTitleChanged成员函数会负责将领域事件中的NewTitle的值设置到Book聚合根的Title状态上,完成书本标题的更新。与之相关的BookTitleChangedEvent的定义如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public class BookTitleChangedEvent : DomainEvent

{

public BookTitleChangedEvent(``string newTitle)

{

this``.NewTitle = newTitle;

}

public string NewTitle { get``; set``; }

public override string ToString()

{

return $``"{Sequence} - {NewTitle}"``;

}

}

首先,下面两个测试用例用于测试Book聚合本身产生领域事件的过程是否正确,如果正确,那么当Book本身本构造时,会产生一个AggregateCreatedEvent,如果更改书本的标题,则又会产生一个BookTitleChangedEvent,所以,第一个测试中,book的版本应该为1,而第二个则为2:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

[Fact]

public void CreateBookTest()

{

var book = new Book();

Assert.NotEqual(Guid.Empty, book.Id);

Assert.Equal(1, book.Version);

}

[Fact]

public void ChangeBookTitleEventTest()

{

var book = new Book();

book.ChangeTitle(``"Hit Refresh"``);

Assert.Equal(``"Hit Refresh"``, book.Title);

Assert.Equal(2, book.UncommittedEvents.Count());

Assert.Equal(2, book.Version);

}

接下来,测试仓储保存Book聚合的正确性,因为我们没有实现一个有效的仓储实例,因此,这里借助Moq帮我们动态生成。在下面的代码中,让Moq对仓储抽象类的PersisDomainEventsAsync受保护成员进行动态生成,指定当它被任何IEnumerable作为参数调用时,都将这些事件保存到一个本地的List中,于是,最后只需要检查List中的领域事件是否符合我们的要求就可以了。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

[Fact]

public async Task PersistBookTest()

{

var domainEventsList = new List<IDomainEvent>();

var mockRepository = new Mock<Repository>();

mockRepository.Protected().Setup<Task>(``"PersistDomainEventsAsync"``,

ItExpr.IsAny<IEnumerable<IDomainEvent>>())

.Callback<IEnumerable<IDomainEvent>>(evnts => domainEventsList.AddRange(evnts))

.Returns(Task.CompletedTask);

var book = new Book();

book.ChangeTitle(``"Hit Refresh"``);

await mockRepository.Object.SaveAsync(book);

Assert.Equal(2, domainEventsList.Count);

Assert.Empty(book.UncommittedEvents);

Assert.Equal(2, book.Version);

}

同理,我们还可以测试仓储读取聚合并恢复聚合状态的正确性,同样还是使用Moq对仓储的LoadDomainEventsAsync进行Mock:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

[Fact]

public async Task RetrieveBookTest()

{

var fakeId = Guid.NewGuid();

var domainEventsList = new List<IDomainEvent>

{

new AggregateCreatedEvent(fakeId),

new BookTitleChangedEvent(``"Hit Refresh"``)

};

var mockRepository = new Mock<Repository>();

mockRepository.Protected().Setup<Task<IEnumerable<IDomainEvent>>>(``"LoadDomainEventsAsync"``,

ItExpr.IsAny<Type>(),

ItExpr.IsAny<Guid>())

.Returns(Task.FromResult(domainEventsList.AsEnumerable()));

var book = await mockRepository.Object.GetByIdAsync<Book>(fakeId);

Assert.Equal(fakeId, book.Id);

Assert.Equal(``"Hit Refresh"``, book.Title);

Assert.Equal(2, book.Version);

Assert.Empty(book.UncommittedEvents);

}

好了,其它的几个测试用例就不多做介绍了,使用Visual Studio运行一下测试然后查看结果就可以了:

image

本文又是一篇长篇幅的文章,好吧,要介绍的东西太多,而且这些内容又不能单独割开成多个主题,所以也就很难控制篇幅了。文章主要介绍了基于CQRS架构的聚合以及聚合根的设计与实现,同时引出了仓储的部分实现,这些内容也是为今后进一步讨论事件驱动型架构做准备。本文介绍的内容对于一个真实的CQRS系统实现来说还是有一定差距的,但总体结构也大致如此。文中还提及了快照的概念,这部分内容我今后在介绍事件存储的实现部分还会详细讨论,下一章打算扩展一下仓储本身,了解一下仓储对领域事件的派发,以及事件处理器对领域事件的处理。

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_4这个tag,如下:

image

  AOP(面向切面编程:Aspect Oriented Programming)为诸如日志记录、性能统计、安全控制、事务处理、异常处理等与具体业务逻辑无关,却需要在全局范围进行执行的功能提供了一种良好重用和与业务逻辑解耦的实现思路。AOP思想是围绕着切面进行的,所谓“切面”就是目标对象的某种操作,其基本过程是在系统其它部分调用目标对象的某种操作时拦截这些调用,在进行真正的调用前/后执行一段中间逻辑,并根据中间逻辑的执行结果决定是否进行真实调用或者修改返回结果。

  AOP带来的好处是明显,但是我们怎么在项目中应用AOP呢?目前AOP在Java领域有一些较成熟的框架诸如 AspectJ、Spring AOP 等,在.NET领域有AspectC#、Castle等。尽管有现成的框架可以使用,但是由于存在学习曲线问题和框架的成熟度问题,使得很多项目通常都不会贸然使用第三方的框架。如果我们自己能够根据项目需要按需设计自己的轻量AOP组件/框架,这就能够给我们的项目带来良好的伸缩性。

  包括现有的框架在内,AOP的实现方式通常被分为“静态织入”和“动态织入”两种。采用静态织入方式的框架是通过扩展编译器对代码的中间语言(IL)插入代码的方式实现对目标对象的调用拦截。动态织入方式在.NET中可以有两种实现:采用“装饰者模式”设计项目类库来实现;基于透明代理(TransparentProxy)/真实代理(RealProxy)来实现。下面就介绍动态织入的这两种实现。

  • 最简单的AOP实现:采用“装饰者模式”实现方法调用拦截

  

  上图是GoF装饰者模式的经典类图,此处的实现类似于上图,基本原理是在封装对象的创建过程,在创建类型的实例时并不返回类型的真正实例,而是返回一个包装了真实对象的”Decorator”。以下的代码展示了这一实现过程。

  首先是定义一个公共的接口 IDataObject:

复制代码

///


/// 数据对象接口;///

public interface IDataObject
{ /// /// 计算结果; ///
int Compute();
}

复制代码

  接下来定义充当”Decorator”的 DataObjectProxy :

复制代码

DataObjectProxy

///


/// IDataObject 代理;实现对 IDataObject 操作的拦截;///

public class DataObjectProxy : IDataObject
{ private IDataObject _realObject; public DataObjectProxy(IDataObject realObject)
{
_realObject = realObject;
}/// /// 拦截对 Compute 方法的调用; /// ///
public int Compute()
{
DoSomethingBeforeCompute();int result = _realObject.Compute();

    DoSomethingAfterCompute(result);

return result;
}private void DoSomethingAfterCompute(int result)
{
Console.WriteLine(“After Compute “ + _realObject.ToString() + “. Result=” + result);
}private void DoSomethingBeforeCompute()
{
Console.WriteLine(“Before Compute “ + _realObject.ToString());
}
}

复制代码

  DataObjectProxy拦截了对真实对象的 Compute 方法的调用,在 Compute 前后输出控制台信息。

  定义真正的实现 DataObject

复制代码

DataObject

///


/// 数据对象;///

public class DataObject : IDataObject
{ private int _parameter1; private int _parameter2;/// /// 私有构造函数,封装DataObject的实例化过程; ///
private DataObject()
{
}/// /// 创建数据对象; ///
public static IDataObject CreateDataObject(int p1, int p2)
{ //创建真实实例;
DataObject realObject = new DataObject();
realObject._parameter1 = p1;
realObject._parameter2 = p2;//返回代理;
return new DataObjectProxy(realObject);
}/// /// “计算”的实现; /// ///
public int Compute()
{ return -1;
}
}

复制代码

  DataObject 通过将构造函数定义为 private 封装其实例化过程。要获得一个 IDataObject 的实例就必需通过静态方法 CreateDataObject 。而用DataObjectProxy代替DataObject的偷天换日的过程就是在 CreateDataObject 方法中完成的。

  以下的代码将展示这一拦截过程

复制代码

public static void Main(string[] args)
{
IDataObject dtObj = DataObject.CreateDataObject(10, 6); int result = dtObj.Compute();

Console.ReadLine();  

}

复制代码

  代码在控制台中的输出如下:

  上面的输出表明我们针对 Compute 方法的调用像期望的那样被拦截了。

  但是我们看到这种AOP的实现方式有其局限性:首先得基于一个特定的接口进行定义,无法创建通用的 Proxy 对象;其次对每一个要拦截的方法都要进行编码实现,无法重用。这些局限使得这种实现方法只能在局部使用,如果在大范围的使用还是存在许多重复性的编码。以下介绍的基于透明代理(TransparentProxy)/真实代理(RealProxy)的实现将解决这些问题。

  • 基于透明代理(TransparentProxy)/真实代理(RealProxy)实现方法调用拦截

  真实代理和透明代理机制是由 .NET Remoting 提供的,此处所说的真实代理(RealProxy)特指 System.Runtime.Remoting.Proxies 命名空间中的 RealProxy 类型。关于透明代理和真实代理的机制及相关概念请参阅 MSDN 文档,在此就不再赘述,直接用代码来表达。

下面基于真实代理(RealProxy)定义了一个通用的代理对象 AopProxy :

复制代码

通用代理:AopProxy

///


/// 通用的代理; ///

///
class AopProxy<T> : RealProxy
{ private T _realObject;public AopProxy(T realObject)
:base(typeof(T))
{
_realObject = realObject;
}/// /// 拦截所有方法的调用; /// /// ///
public override IMessage Invoke(IMessage msg)
{
IMethodCallMessage callMsg = msg as IMethodCallMessage; //调用前拦截;
BeforeInvoke(callMsg.MethodBase); try { //调用真实方法;
object retValue = callMsg.MethodBase.Invoke(_realObject, callMsg.Args); return new ReturnMessage(retValue, callMsg.Args, callMsg.ArgCount - callMsg.InArgCount, callMsg.LogicalCallContext, callMsg);
} catch (Exception ex)
{ return new ReturnMessage(ex, callMsg);
} finally { //调用后处理;
AfterInvoke(callMsg.MethodBase);
}
}private void BeforeInvoke(MethodBase method)
{
Console.WriteLine(“Before Invoke {0}::{1}”, typeof(T).FullName, method.ToString());
}private void AfterInvoke(MethodBase method)
{
Console.WriteLine(“After Invoke {0}::{1}”, typeof(T).FullName, method.ToString());
}
}

复制代码

  以上的代码 AopProxy 是个泛型类型,泛型参数 T 是要拦截的对象的类型,AopProxy 构造函数需要一个实现泛型参数的真实对象作为参数。

  我们为上面的 DataObject 添加一个新的工厂方法 CreateDataObject2 用通过 AopProxy 创建透明代理,具体如下:

复制代码

使用 AopProxy 创建透明代理

///

/// 创建数据对象; ///
public static IDataObject CreateDataObjec2(int p1, int p2)
{ //创建真实实例;
DataObject realObject = new DataObject();
realObject._parameter1 = p1;
realObject._parameter2 = p2;//创建真实代理;
AopProxy<IDataObject> proxy = new AopProxy<IDataObject>(realObject);//返回透明代理;
return (IDataObject)proxy.GetTransparentProxy();
}

复制代码

  修改前面的入口程序通过 CreateDataObject2 方法获得代理,如下:

复制代码

public static void Main(string[] args)
{
IDataObject dtObj = DataObject.CreateDataObject2(10, 6); int result = dtObj.Compute();

Console.ReadLine();  

}

复制代码

  此次的输出如下:

  实现我们预期的结果:通过 AopProxy 实现对不同类型的目标对象的调用拦截的复用,并且在 AopProxy 成功拦截了所有的方法调用。

  至此似乎一切都很完美,但是千万别以为世上真的存在“完美”。基于RealProxy 的实现是有其局限性的。在上面的 AopProxy 的实现中,在示例中类型参数 T 是 IDataObject 接口。那我们能不能用一个 Class 作为泛型参数呢?答案是可以,但这个 Class 必需是继承自 MarshalByRefObject 类型,否则在 new  AopProxy 实例时会抛出异常。也就是说如果目标类型是 Interface,则可以是任意类型,如果是 Class,则必须继承自 MarshalByRefObject 。还好,这只是一个小小的限制,而且面向对象编程也提倡多用接口。

此外,在 AopProxy 的实现中创建返回结果的 ReturnMessage 的代码不同于 MSDN 中的示例,在MSDN的示例中 ReturnMessage 构造函数的 outArgs 、outArgCount 参数总是指定为 null 和 0 。实际上这种情况下,如果目标方法中包含有 out 、ref 类型的参数时,这些此参数的返回值将被忽略,这显然是不允许的。而正确的方法应如此示例。

AOP的实现很简单吧。如果我们再进一步,利用自定义 Attribute 对类型的方法进行标记 Before 和 After 操作,在 AopProxy 的 Invoke 方法中通过反射解析这些标记,或者是通过配置文件定义 Before 和 After 操作,则可以实现更加灵活和丰富的 AOP 功能。

上一篇: 【WEB API项目实战干货系列】- 接口文档与在线测试(二)

这篇我们主要来介绍我们如何在API项目中完成API的登录及身份认证. 所以这篇会分为两部分, 登录API, API身份验证.

这一篇的主要原理是: API会提供一个单独的登录API, 通过用户名,密码来产生一个SessionKey, SessionKey具有过期时间的特点, 系统会记录这个SessionKey, 在后续的每次的API返回的时候,客户端需带上这个Sessionkey, API端会验证这个SessionKey.

登录API

我们先来看一下登录API的方法签名

image

SessionObject是登录之后,给客户端传回的对象, 里面包含了SessionKey及当前登录的用户的信息

image

这里每次的API调用,都需要传SessionKey过去, SessionKey代表了用户的身份信息,及登录过期信息。

登录阶段生成的SessionKey我们需要做保存,存储到一个叫做UserDevice的对象里面, 从语意上可以知道用户通过不同的设备登录会产生不同的UserDevice对象.

image

最终的登录代码如下:

复制代码

[RoutePrefix(“api/accounts”)] public class AccountController : ApiController
{ private readonly IAuthenticationService _authenticationService = null; public AccountController()
{ //this._authenticationService = IocManager.Intance.Reslove();
}

    \[HttpGet\] public void AccountsAPI()
    {

    } /// <summary>
    /// 登录API /// </summary>
    /// <param name="loginIdorEmail">登录帐号(邮箱或者其他LoginID)</param>
    /// <param name="hashedPassword">加密后的密码,这里避免明文,客户端加密后传到API端</param>
    /// <param name="deviceType">客户端的设备类型</param>
    /// <param name="clientId">客户端识别号, 一般在APP上会有一个客户端识别号</param>
    /// <remarks>其他的登录位置啥的,是要客户端能传的东西,都可以在这里扩展进来</remarks>
    /// <returns></returns>
    \[Route("account/login")\] public SessionObject Login(string loginIdorEmail, string hashedPassword, int deviceType = 0, string clientId = "")
    { if (string.IsNullOrEmpty(loginIdorEmail)) throw new ApiException("username can't be empty.", "RequireParameter\_username"); if (string.IsNullOrEmpty(hashedPassword)) throw new ApiException("hashedPassword can't be empty.", "RequireParameter\_hashedPassword"); int timeout = 60; var nowUser = \_authenticationService.GetUserByLoginId(loginIdorEmail); if (nowUser == null) throw new ApiException("Account Not Exists", "Account\_NotExits"); #region Verify Password
        if (!string.Equals(nowUser.Password, hashedPassword))
        { throw new ApiException("Wrong Password", "Account\_WrongPassword");
        } #endregion

        if (!nowUser.IsActive) throw new ApiException("The user is inactive.", "InactiveUser");

        UserDevice existsDevice \= \_authenticationService.GetUserDevice(nowUser.UserId, deviceType);// Session.QueryOver<UserDevice>().Where(x => x.AccountId == nowAccount.Id && x.DeviceType == deviceType).SingleOrDefault();
        if (existsDevice == null)
        { string passkey = MD5CryptoProvider.GetMD5Hash(nowUser.UserId + nowUser.LoginName + DateTime.UtcNow.ToString() + Guid.NewGuid().ToString());
            existsDevice \= new UserDevice()
            {
                UserId \= nowUser.UserId,
                CreateTime \= DateTime.UtcNow,
                ActiveTime \= DateTime.UtcNow,
                ExpiredTime \= DateTime.UtcNow.AddMinutes(timeout),
                DeviceType \= deviceType,
                SessionKey \= passkey
            };

            \_authenticationService.AddUserDevice(existsDevice);
        } else {
            existsDevice.ActiveTime \= DateTime.UtcNow;
            existsDevice.ExpiredTime \= DateTime.UtcNow.AddMinutes(timeout);
            \_authenticationService.UpdateUserDevice(existsDevice);
        }
        nowUser.Password \= ""; return new SessionObject() { SessionKey = existsDevice.SessionKey, LogonUser = nowUser };
    }
}

复制代码

API身份验证

身份信息的认证是通过Web API 的 ActionFilter来实现的, 每各需要身份验证的API请求都会要求客户端传一个SessionKey在URL里面丢过来。

在这里我们通过一个自定义的SessionValidateAttribute来做客户端的身份验证, 其继承自 System.Web.Http.Filters.ActionFilterAttribute, 把这个Attribute加在每个需要做身份验证的ApiControler上面,这样该 Controller下面的所有Action都将拥有身份验证的功能, 这里会存在如果有少量的API不需要身份验证,那该如何处理,这个会做一些排除,为了保持文章的思路清晰,这会在后续的章节再说明.

复制代码

public class SessionValidateAttribute : System.Web.Http.Filters.ActionFilterAttribute
{ public const string SessionKeyName = “SessionKey”; public const string LogonUserName = “LogonUser”; public override void OnActionExecuting(HttpActionContext filterContext)
{ var qs = HttpUtility.ParseQueryString(filterContext.Request.RequestUri.Query); string sessionKey = qs[SessionKeyName]; if (string.IsNullOrEmpty(sessionKey))
{ throw new ApiException(“Invalid Session.”, “InvalidSession”);
}

        IAuthenticationService authenticationService \= IocManager.Intance.Reslove<IAuthenticationService>(); //validate user session
        var userSession = authenticationService.GetUserDevice(sessionKey); if (userSession == null)
        { throw new ApiException("sessionKey not found", "RequireParameter\_sessionKey");
        } else { //todo: 加Session是否过期的判断
            if (userSession.ExpiredTime < DateTime.UtcNow) throw new ApiException("session expired", "SessionTimeOut"); var logonUser = authenticationService.GetUser(userSession.UserId); if (logonUser == null)
            { throw new ApiException("User not found", "Invalid\_User");
            } else {
                filterContext.ControllerContext.RouteData.Values\[LogonUserName\] \= logonUser;
                SetPrincipal(new UserPrincipal<int\>(logonUser));
            }

            userSession.ActiveTime \= DateTime.UtcNow;
            userSession.ExpiredTime \= DateTime.UtcNow.AddMinutes(60);
            authenticationService.UpdateUserDevice(userSession);
        }
    } private void SetPrincipal(IPrincipal principal)
    {
        Thread.CurrentPrincipal \= principal; if (HttpContext.Current != null)
        {
            HttpContext.Current.User \= principal;
        }
    }
}

复制代码

OnActionExcuting方法:

这个是在进入某个Action之前做检查, 这个时候我们刚好可以同RequestQueryString中拿出SessionKey到UserDevice表中去做查询,来验证Sessionkey的真伪, 以达到身份验证的目的。

用户的过期时间:

在每个API访问的时候,会自动更新Session(也就是UserDevice)的过期时间, 以保证SessionKey不会过期,如果长时间未更新,则下次访问会过期,需要重新登录做处理。

Request.IsAuthented:

上面代码的最后一段SetPrincipal就是来设置我们线程上下文及HttpContext上下文中的用户身份信息, 在这里我们实现了我们自己的用户身份类型

复制代码

public class UserIdentity : IIdentity
{ public UserIdentity(IUser user)
{ if (user != null)
{
IsAuthenticated = true;
UserId = user.UserId;
Name = user.LoginName.ToString();
DisplayName = user.DisplayName;
}
} public string AuthenticationType
{ get { return “CustomAuthentication”; }
} public TKey UserId { get; private set; } public bool IsAuthenticated { get; private set; } public string Name { get; private set; } public string DisplayName { get; private set; }
} public class UserPrincipal : IPrincipal
{ public UserPrincipal(UserIdentity identity)
{
Identity = identity;
} public UserPrincipal(IUser user)
: this(new UserIdentity(user))
{

    } /// <summary>
    /// 
    /// </summary>
    public UserIdentity<TKey> Identity { get; private set; }

    IIdentity IPrincipal.Identity
    { get { return Identity; }
    } bool IPrincipal.IsInRole(string role)
    { throw new NotImplementedException();
    }
} public interface IUser<T> {
    T UserId { get; set; } string LoginName { get; set; } string DisplayName { get; set; }
}

复制代码

这样可以保证我们在系统的任何地方,通过HttpContext.User 或者 System.Threading.Thread.CurrentPrincipal可以拿到当前线程上下文的用户信息, 方便各处使用

加入身份认证之后的Product相关API如下:

复制代码

[RoutePrefix(“api/products”), SessionValidate] public class ProductController : ApiController
{
[HttpGet] public void ProductsAPI()
{ } ///


/// 产品分页数据获取 ///

///
[HttpGet, Route(“product/getList”)] public Page GetProductList(string sessionKey)
{ return new Page();
} ///
/// 获取单个产品 ///

///
///
[HttpGet, Route(“product/get”)] public Product GetProduct(string sessionKey, Guid productId)
{ return new Product() { ProductId = productId };
} ///
/// 添加产品 ///

///
///
[HttpPost, Route(“product/add”)] public Guid AddProduct(string sessionKey, Product product)
{ return Guid.NewGuid();
} ///
/// 更新产品 ///

///
///
[HttpPost, Route(“product/update”)] public void UpdateProduct(string sessionKey, Guid productId, Product product)
{

    } /// <summary>
    /// 删除产品 /// </summary>
    /// <param name="productId"></param>
    \[HttpDelete, Route("product/delete")\] public void DeleteProduct(string sessionKey, Guid productId)
    {

    }

复制代码

可以看到我们的ProductController上面加了SessionValidateAttribute, 每个Action参数的第一个位置,加了一个string sessionKey的占位, 这个主要是为了让Swagger.Net能在UI上生成测试窗口

image

这篇并没有使用OAuth等授权机制,只是简单的实现了登录授权,这种方式适合小项目使用.

这里也只是实现了系统的登录,API访问安全,并不能保证 API系统的绝对安全,我们可以透过 路由的上的HTTP消息拦截, 拦截到我们的API请求,截获密码等登录信息, 因此我们还需要给我们的API增加SSL证书,实现 HTTPS加密传输。

另外在前几天的有看到结合客户端IP地址等后混合生成 Sessionkey来做安全的,但是也具有一定的局限性, 那种方案合适,还是要根据自己的实际项目情况来确定.

由于时间原因, 本篇只是从原理方面介绍了API用户登录与访问身份认证,因为这部分真实的测试设计到数据库交互, Ioc等基础设施的支撑,所以这篇的代码只能出现在SwaggerUI中,但是无法实际测试接口。在接下来的代码中我会完善这部分.

代码: 代码下载(代码托管在CSDN Code)

博客搭建参考[[Hexo/Hexo-博客配置|Hexo-博客配置]]
![[Hexo/.assets//Pasted image 20240920143344.png]]

YOLOv8目标检测:使用ONNX模型进行推理_onnx模型推理-CSDN博客

Excerpt

文章浏览阅读8.2k次,点赞46次,收藏119次。本文详细介绍了如何在COCO数据集上使用YOLOv8目标检测模型进行推理,涉及环境配置、代码实现(包括图像、视频和摄像头检测),以及展示ONNX模型在不同大小版本(YOLOv8n,YOLOv8s,YOLOv8m,YOLOv8l,YOLOv8x)上的实验结果。


基于COCO数据集的YOLOv8目标检测onnx模型推理

在本博客中,我们将探讨如何使用YOLOv8目标检测模型进行推理,包括图片,视频文件,摄像头实时检测,特别是ONNX在不同大小(YOLOv8n, YOLOv8s, YOLOv8m, YOLOv8l, YOLOv8x)的模型上进行的实验。我们还将讨论所需的环境配置,代码实现,以及如何展示推理结果。

环境配置

在详细描述环境配置和安装步骤之前,请确保您的系统已经安装了Python和pip。下面是详细的环境配置步骤,适用于基于YOLOv8模型进行目标检测的项目。

1. 安装必要的Python库

1
pip install onnxruntime-gpu==1.13.1 opencv-python==4.7.0.68 numpy==1.24.1 Pillow==9.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple/

如果您没有GPU或者不打算使用GPU,可以安装onnxruntime而不是onnxruntime-gpu

1
pip install onnxruntime==1.13.1 opencv-python==4.7.0.68 numpy==1.24.1 Pillow==9.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple/

2. 验证安装

安装完成后,您可以通过运行Python并尝试导入安装的包来验证是否成功安装了所有必要的库:

1
import onnxruntime import cv2 import numpy import PIL

如果上述命令没有引发任何错误,那么恭喜您,您已成功配置了运行环境。

小贴士

  • 如果您在安装过程中遇到任何问题,可能需要更新pip到最新版本:pip install --upgrade pip
  • 对于使用NVIDIA GPU的用户,确保您的系统已安装CUDA和cuDNN。onnxruntime-gpu要求系统预装这些NVIDIA库以利用GPU加速。

按照这些步骤,您应该能够成功配置环境并运行基于YOLOv8的目标检测项目了。

权重下载

YOLOv8模型的权重可以通过以下百度网盘链接下载:

请确保下载适合您需求的模型版本。

代码实现

以下是进行目标检测的整体代码流程,包括模型加载、图像预处理、推理执行、后处理及结果展示的步骤。

1
import cv2 import onnxruntime as ort from PIL import Image import numpy as np # 置信度 confidence_thres = 0.35 # iou阈值 iou_thres = 0.5 # 类别 classes = {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink', 72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'} # 随机颜色 color_palette = np.random.uniform(100, 255, size=(len(classes), 3)) # 判断是使用GPU或CPU providers = [ ('CUDAExecutionProvider', { 'device_id': 0, # 可以选择GPU设备ID,如果你有多个GPU }), 'CPUExecutionProvider', # 也可以设置CPU作为备选 ] def calculate_iou(box, other_boxes): """ 计算给定边界框与一组其他边界框之间的交并比(IoU)。 参数: - box: 单个边界框,格式为 [x1, y1, width, height]。 - other_boxes: 其他边界框的数组,每个边界框的格式也为 [x1, y1, width, height]。 返回值: - iou: 一个数组,包含给定边界框与每个其他边界框的IoU值。 """ # 计算交集的左上角坐标 x1 = np.maximum(box[0], np.array(other_boxes)[:, 0]) y1 = np.maximum(box[1], np.array(other_boxes)[:, 1]) # 计算交集的右下角坐标 x2 = np.minimum(box[0] + box[2], np.array(other_boxes)[:, 0] + np.array(other_boxes)[:, 2]) y2 = np.minimum(box[1] + box[3], np.array(other_boxes)[:, 1] + np.array(other_boxes)[:, 3]) # 计算交集区域的面积 intersection_area = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1) # 计算给定边界框的面积 box_area = box[2] * box[3] # 计算其他边界框的面积 other_boxes_area = np.array(other_boxes)[:, 2] * np.array(other_boxes)[:, 3] # 计算IoU值 iou = intersection_area / (box_area + other_boxes_area - intersection_area) return iou def custom_NMSBoxes(boxes, scores, confidence_threshold, iou_threshold): # 如果没有边界框,则直接返回空列表 if len(boxes) == 0: return [] # 将得分和边界框转换为NumPy数组 scores = np.array(scores) boxes = np.array(boxes) # 根据置信度阈值过滤边界框 mask = scores > confidence_threshold filtered_boxes = boxes[mask] filtered_scores = scores[mask] # 如果过滤后没有边界框,则返回空列表 if len(filtered_boxes) == 0: return [] # 根据置信度得分对边界框进行排序 sorted_indices = np.argsort(filtered_scores)[::-1] # 初始化一个空列表来存储选择的边界框索引 indices = [] # 当还有未处理的边界框时,循环继续 while len(sorted_indices) > 0: # 选择得分最高的边界框索引 current_index = sorted_indices[0] indices.append(current_index) # 如果只剩一个边界框,则结束循环 if len(sorted_indices) == 1: break # 获取当前边界框和其他边界框 current_box = filtered_boxes[current_index] other_boxes = filtered_boxes[sorted_indices[1:]] # 计算当前边界框与其他边界框的IoU iou = calculate_iou(current_box, other_boxes) # 找到IoU低于阈值的边界框,即与当前边界框不重叠的边界框 non_overlapping_indices = np.where(iou <= iou_threshold)[0] # 更新sorted_indices以仅包含不重叠的边界框 sorted_indices = sorted_indices[non_overlapping_indices + 1] # 返回选择的边界框索引 return indices def draw_detections(img, box, score, class_id): """ 在输入图像上绘制检测到的对象的边界框和标签。 参数: img: 要在其上绘制检测结果的输入图像。 box: 检测到的边界框。 score: 对应的检测得分。 class_id: 检测到的对象的类别ID。 返回: 无 """ # 提取边界框的坐标 x1, y1, w, h = box # 根据类别ID检索颜色 color = color_palette[class_id] # 在图像上绘制边界框 cv2.rectangle(img, (int(x1), int(y1)), (int(x1 + w), int(y1 + h)), color, 2) # 创建标签文本,包括类名和得分 label = f'{classes[class_id]}: {score:.2f}' # 计算标签文本的尺寸 (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) # 计算标签文本的位置 label_x = x1 label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10 # 绘制填充的矩形作为标签文本的背景 cv2.rectangle(img, (label_x, label_y - label_height), (label_x + label_width, label_y + label_height), color, cv2.FILLED) # 在图像上绘制标签文本 cv2.putText(img, label, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA) def preprocess(img, input_width, input_height): """ 在执行推理之前预处理输入图像。 返回: image_data: 为推理准备好的预处理后的图像数据。 """ # 获取输入图像的高度和宽度 img_height, img_width = img.shape[:2] # 将图像颜色空间从BGR转换为RGB img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # 将图像大小调整为匹配输入形状 img = cv2.resize(img, (input_width, input_height)) # 通过除以255.0来归一化图像数据 image_data = np.array(img) / 255.0 # 转置图像,使通道维度为第一维 image_data = np.transpose(image_data, (2, 0, 1)) # 通道首 # 扩展图像数据的维度以匹配预期的输入形状 image_data = np.expand_dims(image_data, axis=0).astype(np.float32) # 返回预处理后的图像数据 return image_data, img_height, img_width def postprocess(input_image, output, input_width, input_height, img_width, img_height): """ 对模型输出进行后处理,提取边界框、得分和类别ID。 参数: input_image (numpy.ndarray): 输入图像。 output (numpy.ndarray): 模型的输出。 input_width (int): 模型输入宽度。 input_height (int): 模型输入高度。 img_width (int): 原始图像宽度。 img_height (int): 原始图像高度。 返回: numpy.ndarray: 绘制了检测结果的输入图像。 """ # 转置和压缩输出以匹配预期的形状 outputs = np.transpose(np.squeeze(output[0])) # 获取输出数组的行数 rows = outputs.shape[0] # 用于存储检测的边界框、得分和类别ID的列表 boxes = [] scores = [] class_ids = [] # 计算边界框坐标的缩放因子 x_factor = img_width / input_width y_factor = img_height / input_height # 遍历输出数组的每一行 for i in range(rows): # 从当前行提取类别得分 classes_scores = outputs[i][4:] # 找到类别得分中的最大得分 max_score = np.amax(classes_scores) # 如果最大得分高于置信度阈值 if max_score >= confidence_thres: # 获取得分最高的类别ID class_id = np.argmax(classes_scores) # 从当前行提取边界框坐标 x, y, w, h = outputs[i][0], outputs[i][1], outputs[i][2], outputs[i][3] # 计算边界框的缩放坐标 left = int((x - w / 2) * x_factor) top = int((y - h / 2) * y_factor) width = int(w * x_factor) height = int(h * y_factor) # 将类别ID、得分和框坐标添加到各自的列表中 class_ids.append(class_id) scores.append(max_score) boxes.append([left, top, width, height]) # 应用非最大抑制过滤重叠的边界框 indices = custom_NMSBoxes(boxes, scores, confidence_thres, iou_thres) # 遍历非最大抑制后的选定索引 for i in indices: # 根据索引获取框、得分和类别ID box = boxes[i] score = scores[i] class_id = class_ids[i] # 在输入图像上绘制检测结果 draw_detections(input_image, box, score, class_id) # 返回修改后的输入图像 return input_image def init_detect_model(model_path): # 使用ONNX模型文件创建一个推理会话,并指定执行提供者 session = ort.InferenceSession(model_path, providers=providers) # 获取模型的输入信息 model_inputs = session.get_inputs() # 获取输入的形状,用于后续使用 input_shape = model_inputs[0].shape # 从输入形状中提取输入宽度 input_width = input_shape[2] # 从输入形状中提取输入高度 input_height = input_shape[3] # 返回会话、模型输入信息、输入宽度和输入高度 return session, model_inputs, input_width, input_height def detect_object(image, session, model_inputs, input_width, input_height): # 如果输入的图像是PIL图像对象,将其转换为NumPy数组 if isinstance(image, Image.Image): result_image = np.array(image) else: # 否则,直接使用输入的图像(假定已经是NumPy数组) result_image = image # 预处理图像数据,调整图像大小并可能进行归一化等操作 img_data, img_height, img_width = preprocess(result_image, input_width, input_height) # 使用预处理后的图像数据进行推理 outputs = session.run(None, {model_inputs[0].name: img_data}) # 对推理结果进行后处理,例如解码检测框,过滤低置信度的检测等 output_image = postprocess(result_image, outputs, input_width, input_height, img_width, img_height) # 返回处理后的图像 return output_image if __name__ == '__main__': # 模型文件的路径 model_path = "yolov8n.onnx" # 初始化检测模型,加载模型并获取模型输入节点信息和输入图像的宽度、高度 session, model_inputs, input_width, input_height = init_detect_model(model_path) # 三种模式 1为图片预测,并显示结果图片;2为摄像头检测,并实时显示FPS; 3为视频检测,并保存结果视频 mode = 1 if mode == 1: # 读取图像文件 image_data = cv2.imread("street.jpg") # 使用检测模型对读入的图像进行对象检测 result_image = detect_object(image_data, session, model_inputs, input_width, input_height) # 将检测后的图像保存到文件 cv2.imwrite("output_image.jpg", result_image) # 在窗口中显示检测后的图像 cv2.imshow('Output', result_image) # 等待用户按键,然后关闭显示窗口 cv2.waitKey(0) elif mode == 2: # 打开摄像头 cap = cv2.VideoCapture() # 0表示默认摄像头,如果有多个摄像头可以尝试使用1、2等 # 检查摄像头是否成功打开 if not cap.isOpened(): print("Error: Could not open camera.") exit() # 初始化帧数计数器和起始时间 frame_count = 0 start_time = time.time() # 循环读取摄像头视频流 while True: # 读取一帧 ret, frame = cap.read() # 检查帧是否成功读取 if not ret: print("Error: Could not read frame.") break # 使用检测模型对读入的帧进行对象检测 output_image = detect_object(frame, session, model_inputs, input_width, input_height) # 计算帧速率 frame_count += 1 end_time = time.time() elapsed_time = end_time - start_time fps = frame_count / elapsed_time print(f"FPS: {fps:.2f}") # 将FPS绘制在图像上 cv2.putText(output_image, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) # 在窗口中显示当前帧 cv2.imshow("Video", output_image) # 按下 'q' 键退出循环 if cv2.waitKey(1) & 0xFF == ord('q'): break # 释放摄像头资源 cap.release() # 关闭窗口 cv2.destroyAllWindows() elif mode == 3: # 输入视频路径 input_video_path = 'kun.mp4' # 输出视频路径 output_video_path = 'kun_det.mp4' # 打开视频文件 cap = cv2.VideoCapture(input_video_path) # 检查视频是否成功打开 if not cap.isOpened(): print("Error: Could not open video.") exit() # 读取视频的基本信息 frame_width = int(cap.get(3)) frame_height = int(cap.get(4)) fps = cap.get(cv2.CAP_PROP_FPS) # 定义视频编码器和创建VideoWriter对象 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 根据文件名后缀使用合适的编码器 out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) # 初始化帧数计数器和起始时间 frame_count = 0 start_time = time.time() while True: ret, frame = cap.read() if not ret: print("Info: End of video file.") break # 对读入的帧进行对象检测 output_image = detect_object(frame, session, model_inputs, input_width, input_height) # 计算并打印帧速率 frame_count += 1 end_time = time.time() elapsed_time = end_time - start_time if elapsed_time > 0: fps = frame_count / elapsed_time print(f"FPS: {fps:.2f}") # 将处理后的帧写入输出视频 out.write(output_image) #(可选)实时显示处理后的视频帧 cv2.imshow("Output Video", output_image) if cv2.waitKey(1) & 0xFF == ord('q'): break # 释放资源 cap.release() out.release() cv2.destroyAllWindows() else: print("输入错误,请检查mode的赋值")

请根据您的需求调整置信度阈值、IOU阈值以及模型和mode的值(1为图片预测;2为摄像头检测; 3为视频检测)。

结果展示

推理完成后,您可以查看处理后的图像,如下所示:

  • 原始图片:待检测图片

  • 检测后的图片:检测后的图片

请替换为您自己的图像路径来查看效果;或者其他两种模式(摄像头实时检测、视频文件检测)进行尝试。

总结

通过以上步骤,我们展示了如何使用YOLOv8进行目标检测的完整流程,从环境配置到代码实现和结果展示。此过程适用于YOLOv8目标检测任意模型进行检测任务。


希望这篇博客能够帮助您理解和实现基于YOLOv8的目标检测项目。如果有任何问题或需要进一步的帮助,请随时留言讨论。

安装

1
2
3
4
5
6
7
8
9

sudo apt install lm-sensors curl hddtemp # 安装工具

sensors-detect # 检测传感器

sudo apt install conky # 安装

conky & # 运行conky

传感器数据样例

1
2
3
4
5
6
7
8
9
acpitz-virtual-0-
Adapter: Virtual device
temp1: +49.5°C (crit = +99.0°C)

coretemp-isa-0000
Adapter: ISA adapter
Physical id 0: +49.0°C (high = +100.0°C, crit = +100.0°C)
Core 0: +49.0°C (high = +100.0°C, crit = +100.0°C)
Core 1: +49.0°C (high = +100.0°C, crit = +100.0°C)

:TODO conky 默认运行效果截图

conky默认以一个弹窗的形式运行,并使用位于/etc/conky/conky.conf的基础配置文件

集成到桌面

1
2
cp /etc/conky/conky.conf /home/$USER/.conkyrc # 复制默认配置文件