Close

October 18, 2018

CQRS/ES In .NET: The Domain Object

If you haven’t read the previous posts in this series, you might want to check out:

In this post, I will look at the Domain Object. So far, we know we can create a Command, give it to the Command Handler, who will use the Repository to retrieve a Domain Object and call the necessary methods on it. After that, the Command Handler will tell the Repository to save the object. Let’s look into the implementation of the Domain Object.

The Public API

To continue using the customer example of my previous post, we could assume a Customer could have an API like this:

public class Customer
{
    public Customer(Guid id, string name) {}
    public void UpdateName(string newName) {}
    public Guid Id { get; }
    public string Name { get; }
}

We’ll leave the implemenation out for now, so it’s a very simple class: you can create the customer with a name, and you can update the name. Why not just use a public property with a getter and setter for the name? Well, you will see later that in a CQRS/ES system, it’s better to have every modification of the object pass through a method and keep the properties read-only.

A Basic Implementation

Now that we have the API of our entity, how do we make this event sourced? In short, this is what we want to happen when we call a method like UpdateName:

  • Create a new event with the new name
  • Call a private method on the object that will handle such an event
  • Store the event in a list of events that haven’t been persisted yet

This is what the UpdateName method would look like:

public void UpdateName(string name)
{
    Update(new NameUpdated { Name = name });
}

And this is what the method that handles this event would look like:

public void OnNameUpdated(NameUpdated nameUpdated)
{
    Name = nameUpdated.Name;
}

Why this complex? Why just not set the name in the UpdateName method? Well, first, we will be using the OnNameUpdated method in another scenario too: when we rebuild the object with past (and persisted) events. More on that later. But second, because in the Update method, we’ll do more than just call the OnNameUpdated method.

But before we look at the Update method, we need to cover some other concerns.

Race Conditions

What if two users perform an action at the same time? Two threads may start, both retrieving the object, but only one will persist the object first. The second thread could be operating on old data and perform some manipulation that is no longer valid. A classic race condition.

In this implementation of CQRS/ES, we solve this by adding a Version property to the entity and to the event. At first, an entity is at version -1. For every event that is applied to the entity, the version goes up. The event also gets that version number.

Let’s assume we have a Customer with these events:

EventEvent Version
CustomerCreated0
NameChanged1
NameChanged2

This Customer will have version number 2. If two more events are added after it (event 3 and 4) it will be at version 4.

This makes it easy to avoid concurrency issues. We can tell the database that the combination of AggregateId and Version of an event should be unique. When two users update the name simultaneously, only one of the events will be persisted. The other will throw an exception.

Source Id

I mentioned AggregateId previously. It’s important that we can link our persisted events to an entity or aggregate. That is why we tack on a SourceId property to our event.

This way, we can later retrieve all events for a given aggregate.

Bringing It Together

Let’s now look at the Update method:

private void Update(VersionedEvent e)
{
    e.SourceId = Id;
    e.Version = Version + 1;
    _handlers[e.GetType()].Invoke(e);
    Version = e.Version;
    _pendingEvents.Add(e);
}

As you can see, we’re setting the SourceId of the event to the Id of our Customer. Then, we set the Version to the Version of our customer + 1.

The next step is important. We want to invoke the correct handler. In the case of the UpdateName method, we want to invoke OnNameUpdated. But we might have multiple handlers. The way we solve this, is by having a dictionary with the event type as key and the handler as value. You’ll see how we build up that dictionary later.

When we’ve invoked the handler, our aggregate (i.e. customer) has now increased a version, so we set that. And finally, we add the event to a list of pending events, i.e. events that haven’t been persisted yet. These pending events can then be accessed by the repository and stored to a database.

Rehydrating

Once we have all our events in a database table, how do we load an aggregate from said database? This is called “rehydrating”. In short, you take the relevant events and apply them to a object of the correct class again. This is actually very simple.

Our repository called a constructor that was found using reflection. So all our aggregates have to provide a constructor that looks like this:

public Customer(Guid id, IEnumerable<IVersionedEvent> history) : this(id)
{
    foreach (var e in pastEvents)
    {
        _handlers[e.GetType()].Invoke(e);
        Version = e.Version
    }
}

See how we’re using that handler dictionary again? We’re calling the methods like OnNameUpdated again, and setting the version of our aggregate accordingly. After this is done, we end up with an aggregate that looked exactly like it was when we persisted the events. But instead of persisting every property into multiple columns, we only persisted the events, i.e. things that happened to our aggregate.

The Handler Dictionary

One last thing before we wrap up: how is the handler dictionary built? Again, this is very simple. I implemented it like this by having a method like this:

private readonly Dictionary<Type, Action<IVersionedEvent>> _handlers = new Dictionary<Type, Action<IVersionedEvent>>();

private void Handles<TEvent>(Action<TEvent> handler) where TEvent: IEvent
{
    _handlers.Add(typeof(TEvent), @event => handler((TEvent)@event));
}

This allows me to call this in the constructor of my object:

Handles<NameUpdated>(OnNameUpdated);

Meaning, that when a NameUpdated event must be handled, it will call the OnNameUpdated method.

Finishing This Puzzle

There are many pieces involved in the Domain object of a CQRS/ES system, but they’re actually all quite simple. Some pieces will return for every aggregate class you write, so these can be put in a base class. I’ll show you the base class I’ve used at a project and how the Customer class of our example uses it.

So here is the base class:

public abstract class EventSourced : IEventSourced
{
    private readonly Dictionary<Type, Action<IVersionedEvent>> _handlers = new Dictionary<Type, Action<IVersionedEvent>>();
    private readonly List<IVersionedEvent> _pendingEvents = new List<IVersionedEvent>();

    /// <summary>
    /// A protected constructor that every inheriting class must call
    /// </summary>
    protected EventSourced(Guid id)
    {
        Id = id;
    }

    public Guid Id { get; }

    /// <summary>
    /// Gets the entity's version. As the entity is being updated and events being generated, the _version is incremented.
    /// </summary>
    public int Version { get; protected set; } = -1;

    /// <summary>
    /// Gets the collection of new events since the entity was loaded, as a consequence of command handling.
    /// </summary>
    public IEnumerable<IVersionedEvent> Events => _pendingEvents;

    /// <summary>
    /// Configures a handler for an event. 
    /// </summary>
    protected void Handles<TEvent>(Action<TEvent> handler)
        where TEvent : IEvent
    {
        _handlers.Add(typeof(TEvent), @event => handler((TEvent)@event));
    }

    /// <summary>
    /// Allows inheriting classes to rehydrate themselves
    /// </summary>
    protected void LoadFrom(IEnumerable<IVersionedEvent> pastEvents)
    {
        foreach (var e in pastEvents)
        {
            _handlers[e.GetType()].Invoke(e);
            Version = e.Version;
        }
    }

    /// <summary>
    /// Allows inheriting classes to update themselves using events.
    /// </summary>
    protected void Update(VersionedEvent e)
    {
        e.SourceId = Id;
        e.Version = Version + 1;
        _handlers[e.GetType()].Invoke(e);
        Version = e.Version;
        _pendingEvents.Add(e);
    }
}

And this would be our customer class:

public class Customer : EventSourced 
{
    /// <summary>
    /// A private constructor that our public constructors call.
    /// This constructor sets up the handlers.
    /// </summary>
    private Customer(Guid id): base(id)
    {
        Handles<CustomerCreated>(OnCreated);
        Handles<NameUpdated>(OnNameUpdated);
    }

    public Customer(Guid id, string name) : base(id) 
    {
        // Do nothing but add a new CustomerCreated event that contains 
        // the provided name.
        Update(new CustomerCreated(name));
    }

    public Customer(Guid id, IEnumerable<IVersionedEvent> history) : this(id) 
    {
        // Call the method in our base class that rehydrates
        LoadFrom(history);
    }

    public string Name { get; private set; }

    public void UpdateName(string newName)
    {
        Update(new NameUpdated(newName));
    }

    private void OnCreated(CustomerCreated e) {
        Name = e.Name;
    }

    private void OnNameUpdated(NameUpdated e)
    {
        Name = e.NewName;
    }
}

Once you get the base class that provides all the plumbing out of the way, the aggregate classes look quite clean. They also allow you to use method names that match with the actions that your end user is taking. This is one of the reasons that CQRS/ES matches so nicely with Domain Driven Design.

This has proven to be quite lengthy post, but that’s due to the nature of the Domain object. And its importance! It’s where everything comes together in CQRS/ES.

I promise the next post will be shorter. Let’s recap what we’ve covered so far:

  • when we receive a trigger from the outside (a UI, API call, etc), we can create a Command object, a simple DTO containing the necessary data
  • that Command is given to a Command Handler, a class that can handle on or more commands, i.e. knows what aggregate to retrieve and what method to call on that aggregate
  • the Command Handler will use a Repository to retrieve a Domain Object (rehydrating it) and also save it back to the database (which means just storing any pending events)

Once we store these pending events, we can use them to update our read model. Because we don’t want to rehydrate our aggregates for every read. That would be slow, and we might need multiple aggregates to get a read model that is useful. Updating our read model is done in event handlers and is the topic for our next post.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.