0%

   前面介绍的所有专题都是基于经典的领域驱动实现的,然而,领域驱动除了经典的实现外,还可以基于CQRS模式来进行实现。本专题将全面剖析如何基于CQRS模式(Command Query Responsibility Segregation,命令查询职责分离)来实现领域驱动设计。

   在介绍具体的实现之前,对于之前不了解CQRS的朋友来说,首先第一个问题应该是:什么是CQRS啊?你倒是详细介绍完CQRS后再介绍具体实现啊?既然大家会有这样的问题,所以本专题首先全面介绍下什么是CQRS。

2.1 CQRS发展历程

  在介绍CQRS之前,我觉得有必要先了解一下CQS(即Command Query Separation,命令查询分离)模式。我们可以理解CQRS是在DDD的实践中基于CQS理论而出现的一种体系结构模式。CQS模式最早由软件大师Bertrand Meyer(Eiffel语言之父,面向对象开-闭原则OCP提出者)提出,他认为,对象的行为仅有两种:命令和查询,不存在第三种情况。根据CQS的思想,任何方法都可以拆分为命令和查询两部分。例如下面的方法:

复制代码

    private int \_number = 0;
    public int Add(int factor)
    {
        \_number += factor;
        return \_number;
    }

复制代码

  在上面的方法中,执行了一个命令,即对变量_number加上一个因子factor,同时又执行了一个查询,即查询返回_number的值。根据CQS的思想,该方法可以拆成Command和Query两个方法:

复制代码

复制代码

private int _number = 0;
private void AddCommand(int factor)
{
_number += factor;
}

private int QueryValue()
{
return _number;
}

复制代码

复制代码

  命令和查询分离使得我们可以更好地把握对象的细节,更好地理解哪些操作会改变系统的状态。从而使的系统具有更好的扩展性,并获得更好的性能。

  CQRS根据CQS思想,并结合领域驱动设计思想,由Grey Young在CQRS, Task Based UIs, Event Sourcing agh! 这篇文章中提出。CQRS将之前只需要定义一个对象拆分成两个对象,分离的原则按照对象中方法是执行命令还是执行查询来进行拆分的。

2.2 CQRS结构

由前面的介绍可知,采用CQRS模式实现的系统结构可以分为两个部分:命令部分和查询部分。其系统结构如下图所示:

  从上面系统结构图可以发现,采用CQRS实现的领域驱动设计与经典DDD有很大的不同。采用CQRS实现的DDD结构大体分为两部分,查询部分和命令部分,并且维护着两个数据库实例,一个专门用来进行查询,另一个用来响应命令操作。然后通过EventHandler操作将命令改变的状态同步到用来查询的数据库实例中。从这个描述中,我们可能会联想到数据库级别主从读写分离。然而数据读写分离是在数据库层面来实现读写分离的机制,而CQRS是在业务逻辑层面来实现读写分离机制。两者是站在两个不同的层面对读写分离进行实现的。

   前面我们已经详细介绍了CQRS模式,相信经过前面的介绍,大家对CQRS模式一定有一些了解了,但为什么要引入CQRS模式呢?

  在传统的实现中,对DB执行增、删、改、查所有操作都会放在对应的仓储中,并且这些操作都公用一份领域实体对象。对于一些简单的系统,使用传统的设计方式并没有什么不妥,但在一些大型复杂的系统中,传统的实现方式也会存在一些问题:

  • 使用同一个领域实体来进行数据读写可能会遇到资源竞争的情况。所以经常要处理锁的问题,在写入数据的时候,需要加锁,读取数据的时候需要判断是否允许脏读。这样使得系统的逻辑性和复杂性增加,并会影响系统的吞吐量。
  • 在大数据量同时进行读写的情况下,可能出现性能的瓶颈。
  • 使用同一个领域实体来进行数据库读写可能会太粗糙。在大多是情况下,比如编辑操作,可能只需要更新个别字段,这时却需要将整个对象都穿进去。还有在查询的时候,表现层可能只需要个别字段,但需要查询和返回整个领域实体,再把领域实体对象转换从对应的DTO对象。
  • 读写操作都耦合在一起,不利于对问题的跟踪和分析,如果读写操作分离的话,如果是由于状态改变的问题就只需要去分析写操作相关的逻辑就可以了,如果是关于数据的不正确,则只需要关心查询操作的相关逻辑即可。

  针对上面的这些问题,采用CQRS模式的系统都可以解决。由于CQRS模式中将查询和命令进行分析,所以使得两者分工明确,各自负责不同的部分,并且在业务上将命令和查询分离能够提高系统的性能和可扩展性。既然CQRS这么好,那是不是所有系统都应该基于CQRS模式去实现呢?显然不是的,CQRS也有其使用场景:

  1. 系统的业务逻辑比较复杂的情况下。因为本来业务逻辑就比较复杂了,如果再把命令操作和查询操作绑定同一个业务实体的话,这样会导致后期的需求变更难于进行扩展下去。
  2. 需要对系统中查询性能和写入性能分开进行优化的情况下,尤其读/写比例非常高的情况下。例如,在很多系统中读操作的请求数远大于写操作,此时,就可以考虑将写操作抽离出来进行单独扩展。
  3. 系统在将来随着时间不断变化的情况下。

  然而,CQRS也有其不适用的场景:

  • 业务逻辑比较简单的情况下,此时采用CQRS反而会把系统搞的复杂。
  • 系统用户访问量都比较小的情况下,并且需求以后不怎么会变更的情况下。针对这样的系统,完全可以用传统的实现方式快速将系统实现出来,没必要引入CQRS来增加系统的复杂度。

  在CQRS中,查询方面,直接通过方法查询数据库,然后通过DTO将数据返回,这个方面的操作相对比较简单。而命令方面,是通过发送具体Command,接着由CommandBus来分发到具体的CommandHandle来进行处理,CommandHandle在进行处理时,并没有直接将对象的状态保存到外部持久化结构中,而仅仅是从领域对象中获得产生的一系列领域事件,并将这些事件保存到Event Store中,同时将事件发布到事件总线Event Bus进行下一步处理;接着Event Bus同样进行协调,将具体的事件交给具体的Event Handle进行处理,最后Event Handler再把对象的状态保存到对应Query数据库中。

上面过程正是CQRS系统中的调用顺序。从中可以发现,采用CQRS实现的系统存在两个数据库实例,一个是Event Store,该数据库实例用来保存领域对象中发生的一系列的领域事件,简单来说就是保存领域事件的数据库。另一个是Query Database,该数据库就是存储具体的领域对象数据的,查询操作可以直接对该数据库进行查询。由于,我们在Event Store中记录领域对象发生的所有事件,这样我们就可以通过查询该数据库实例来获得领域对象之前的所有状态了。所谓Event Sourcing,就是指的的是:通过事件追溯对象的起源,它允许通过记录下来的事件,将领域模型恢复到之前的任意一个时间点。

  通过Event来记录领域对象所发生的所有状态,这样利用系统的跟踪并能够方便地回滚到某一历史状态。经过上面的描述,感觉事件溯源一般用于系统的维护。例如,我们可以设计一个同步服务,该服务程序从Event Store数据库查询出领域对象的历史数据,从而打印生成一个历史报表,如历史价格报表等。但正是的CQRS系统中如何使用Event Sourcing的呢?

  在前面介绍CQRS系统的调用顺序中,我们讲到,由Event Handler将对象的状态保存到对应的Query数据库中,这里有一个问题,对象的状态怎么获得呢?对象状态的获得正是由Event sourcing机制来获得,因为用户发送的仅仅是Command,Command中并不包含对象的状态数据,所以此时需要通过Event Sourcing机制来查询Event Store来还原对象的状态,还原根据就是对应的Id,该Id是通过命令传入的。Event Sourcing的调用需要放在CommandHandle中,因为CommandHandle需要先获得领域对象,这样才能把领域对象与命令对象来进行对比,从而获得领域对象中产生的一系列领域事件。

   然而,当随着时间的推移,领域事件变得越来越多时,通过Event Sourcing机制来还原对象状态的过程会非常耗时,因为每一次都需要从最早发生的事件开始。那有没有好的一个方式来解决这个问题呢?答案是肯定的,即在Event Sourcing中引入快照(Snapshots)实现。实现原理就是——没产生N个领域事件,则对对象做一次快照。这样,领域对象溯源的时候,可以先从快照中获得最近一次的快照,然后再逐个应用快照之后所有产生的领域事件,而不需要每次溯源都从最开始的事件开始对对象重建,这样就大大加快了对象重建的过程。

  前面介绍了那么多CQRS的内容,下面就具体通过一个例子来演示下CQRS系统的实现。

  命令部分的实现

复制代码

复制代码

// 应用程序初始化操作,将依赖的对象通过依赖注入框架StructureMap进行注入
public sealed class ServiceLocator
{
    private static readonly ICommandBus \_commandBus;
    private static readonly IStorage \_queryStorage;
    private static readonly bool IsInitialized;
    private static readonly object LockThis = new object();
    
    static ServiceLocator()
    {
        if (!IsInitialized)
        {
            lock (LockThis)
            {
                // 依赖注入
                ContainerBootstrapper.BootstrapStructureMap();

                \_commandBus = ContainerBootstrapper.Container.GetInstance<ICommandBus>();
                \_queryStorage = ContainerBootstrapper.Container.GetInstance<IStorage>();
                IsInitialized = true;
            }
        }
    }

    public static ICommandBus CommandBus
    {
        get { return \_commandBus; }
    }

    public static IStorage QueryStorage
    {
        get { return \_queryStorage; }
    }
}

class ContainerBootstrapper
{
    private static Container \_container;
    public static void BootstrapStructureMap()
    {
        \_container = new Container(x =>
        {
            x.For(typeof (IDomainRepository<>)).Singleton().Use(typeof (DomainRepository<>));
            x.For<IEventStorage>().Singleton().Use<InMemoryEventStorage>();
            x.For<IEventBus>().Use<EventBus>();
            x.For<ICommandBus>().Use<CommandBus>();
            x.For<IStorage>().Use<InMemoryStorage>();
            x.For<IEventHandlerFactory>().Use<StructureMapEventHandlerFactory>();
            x.For<ICommandHandlerFactory>().Use<StructureMapCommandHandlerFactory>();
        });
    }

    public static Container Container 
    {
        get { return \_container;}
    }
}

public class HomeController : Controller
{
[HttpPost]
public ActionResult Add(DiaryItemDto item)
{
// 发布CreateItemCommand到CommandBus中
ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));

        return RedirectToAction("Index");
    }    
}

// CommandBus 的实现
public class CommandBus : ICommandBus
{
private readonly ICommandHandlerFactory _commandHandlerFactory;

    public CommandBus(ICommandHandlerFactory commandHandlerFactory)
    {
        \_commandHandlerFactory = commandHandlerFactory;
    }

    public void Send<T>(T command) where T : Command
    {
        // 获得对应的CommandHandle来对命令进行处理
        var handlers = \_commandHandlerFactory.GetHandlers<T>();

        foreach (var handler in handlers)
        {
            // 处理命令
            handler.Execute(command);
        }
    }       
}

// 对CreateItemCommand处理类
public class CreateItemCommandHandler : ICommandHandler
{
private readonly IDomainRepository _domainRepository;

    public CreateItemCommandHandler(IDomainRepository<DiaryItem> domainRepository)
    {
        \_domainRepository = domainRepository;
    }

    // 具体处理逻辑
    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }
        if (\_domainRepository == null)
        {
            throw new InvalidOperationException("domainRepository is not initialized.");
        }

        var aggregate = new DiaryItem(command.ID, command.Title, command.Description, command.From, command.To)
        {
            Version = -1
        };

        // 将对应的领域实体进行保存
        \_domainRepository.Save(aggregate, aggregate.Version);
    }
}

// IDomainRepository的实现类
public class DomainRepository : IDomainRepository where T : AggregateRoot, new()
{
// 并没有直接对领域实体进行保存,而是先保存领域事件进EventStore,然后在Publish事件到EventBus进行处理
// 然后EventBus把事件分配给对应的事件处理器进行处理,由事件处理器来把领域对象保存到QueryDatabase中
public void Save(AggregateRoot aggregate, int expectedVersion)
{
if (aggregate.GetUncommittedChanges().Any())
{
_storage.Save(aggregate);
}
}
}

// Event Store的实现,这里保存在内存中,通常是保存到具体的数据库中,如SQL Server、Mongodb等
public class InMemoryEventStorage : IEventStorage
{
// 领域事件的保存
public void Save(AggregateRoot aggregate)
{
// 获得对应领域实体未提交的事件
var uncommittedChanges = aggregate.GetUncommittedChanges();
var version = aggregate.Version;

        foreach (var @event in uncommittedChanges)
        {
            version++;
            // 没3个事件创建一次快照
            if (version > 2)
            {
                if (version % 3 == 0)
                {
                    var originator = (ISnapshotOrignator)aggregate;
                    var snapshot = originator.CreateSnapshot();
                    snapshot.Version = version;
                    SaveSnapshot(snapshot);
                }
            }

            @event.Version = version;
            // 保存事件到EventStore中
            \_events.Add(@event);
        }

        // 保存事件完成之后,再将该事件发布到EventBus 做进一步处理
        foreach (var @event in uncommittedChanges)
        {
            var desEvent = TypeConverter.ChangeTo(@event, @event.GetType());
            \_eventBus.Publish(desEvent);
        }
    }
}

// EventBus的实现
public class EventBus : IEventBus
{
private readonly IEventHandlerFactory _eventHandlerFactory;

    public EventBus(IEventHandlerFactory eventHandlerFactory)
    {
        \_eventHandlerFactory = eventHandlerFactory;
    }

    public void Publish<T>(T @event) where T : DomainEvent
    {
        // 获得对应的EventHandle来处理事件
        var handlers = \_eventHandlerFactory.GetHandlers<T>();
        foreach (var eventHandler in handlers)
        {
            // 对事件进行处理
            eventHandler.Handle(@event);
        }
    }
}

// DiaryItemCreatedEvent的事件处理类
public class DiaryIteamCreatedEventHandler : IEventHandler
{
private readonly IStorage _storage;

    public DiaryIteamCreatedEventHandler(IStorage storage)
    {
        \_storage = storage;
    }

    public void Handle(DiaryItemCreatedEvent @event)
    {
        var item = new DiaryItemDto()
        {
            Id = @event.SourceId,
            Description = @event.Description,
            From = @event.From,
            Title = @event.Title,
            To = @event.To,
            Version = @event.Version
        };

        // 将领域对象持久化到QueryDatabase中
        \_storage.Add(item);
    }
}

复制代码

复制代码

  上面代码主要演示了Command部分的实现,从代码可以看出,首先我们需要通过ServiceLocator类来对依赖注入对象进行注入,然后UI层通过CommandBus把对应的命令发布到CommandBus中进行处理,命令总线再查找对应的CommandHandler来对命令进行处理,接着CommandHandler调用仓储类来保存领域对象对应的事件,保存事件成功后再将事件发布到事件总线中进行处理,然后由对应的事件处理程序将领域对象保存到QueryDatabase中。这样就完成了命令部分的操作,从中可以发现,命令部分的实现和CQRS系统中的系统结构图的处理过程是一样的。然而创建日志命令并没有涉及事件溯源操作,因为创建命令并需要重建领域对象,此时的领域对象是通过创建日志命令来获得的,但在修改和删除命令中涉及了事件溯源,因为此时需要根据命令对象的ID来重建领域对象。具体的实现可以参考源码。

  下面让我们再看看查询部分的实现。

  查询部分的实现代码:

复制代码

复制代码

public class HomeController : Controller
{
// 查询部分
public ActionResult Index()
{
// 直接获得QueryDatabase对象来查询所有日志
var model = ServiceLocator.QueryStorage.GetItems();
return View(model);
}
}

public class InMemoryStorage : IStorage
{
private static readonly List Items = new List();

    public DiaryItemDto GetById(Guid id)
    {
        return Items.FirstOrDefault(a => a.Id == id);
    }

    public void Add(DiaryItemDto item)
    {
        Items.Add(item);
    }

    public void Delete(Guid id)
    {
        Items.RemoveAll(i => i.Id == id);
    }

    public List<DiaryItemDto> GetItems()
    {
        return Items;
    }
}

复制代码

复制代码

  从上面代码可以看出,查询部分的代码实现相对比较简单,UI层直接通过QueryDatabase来查询领域对象,然后由UI层进行渲染出来显示。

  到此,一个简单的CQRS系统就完成了,然而在项目中,UI层并不会直接CommandBus和QueryDatabase进行引用,而是通过对应的CommandService和QueryService来进行协调,具体的系统结构如下图所示(只是在CommandBus和Query Database前加入了一个SOA的服务层来进行协调,这样有利于系统扩展,可以通过SOA服务来进行请求路由,将不同请求路由不同的系统中,这样会可以实现多个系统进行一个整合):

  关于该CQRS系统的演示效果,大家可以自行去Github或MSDN中进行下载,具体的下载地址将会本专题最后给出。

   到这里,本专题关于CQRS的介绍就结束了,并且本专题也是领域驱动设计系列的最后一篇了。本系列专题的内容主要是参考daxnet的ByteartRetail案例,由于daxnet在写这个案例的时候并没有一步一步介绍其创建过程,对于一些领域驱动的初学者来说,直接去学习这个案例未免会有点困难,导致学习兴趣降低,从而放弃领域驱动的学习。为了解决这些问题,所以,本人对ByteartRetail案例进行剖析,并参考该案例一步步实现自己的领域驱动案例OnlineStore。希望本系列可以帮助大家打开领域驱动的大门。

  由于现在NO-SQL在互联网行业的应用已经非常流行,以至于面试的时候经常会被问到你用过的非关系数据库有哪些?所以本人也不想Out,所以在最近2个月的时候学习了一些No-SQL的内容,所以,接下来,我将会开启一个NO-SQL系列,记录自己这段时间来学习NO-SQL的一些心得和体会。

  本专题所有源码下载:

  Github地址:https://github.com/lizhi5753186/CQRSDemo

   MSDN地址:https://code.msdn.microsoft.com/CQRS-1f05ebe5

     本文参考链接:

     http://www.codeproject.com/Articles/555855/Introduction-to-CQRS

     http://www.cnblogs.com/daxnet/archive/2010/08/02/1790299.html