So far, we’ve covered the CQRS/ES pattern, the Command object, and the Command Handler object. In our last post, we saw that the Command Handler needs a repository to retrieve an object and save it after changes have been made. In this post, I will go into the Repository.
The Interface
The Repository will be responsible for finding an aggregate and saving it. So our public interface is simple:
public interface IEventSourcedRepository<T> where T : IEventSourced
{
T Find(Guid id);
void Save(T eventSourced, string correlationId);
}
Let’s take this apart.
First, it’s a generic interface, but we’ll see later that we only need one implementation. The IEventSource interface will also come back later.
Next, you can see we want a method to retrieve an aggregate by its Id. In my case, I’ve decided to use a Guid.
Finally, we need a method to save the aggregate. We give it the aggregate, but also a correlationId. What is this for?
If you noticed in the post on the Command object, it sets its own Id by calling Guid.NewGuid(). Then in our post on the CommandHandler object, we pass this Id to the repository. In later steps, we will be able to use this Id to see which events originated from the same command. To see this, we need to look at an implementation of our Repository.
Our repository will only read from and write to a single table. This may seem counter-intuitive, as we will have several different entities. For example, we might have a Chat entity, but also a User entity. And still we will store it all in one table.
This is because we won’t be storing the current state of an entity like we would to an a traditional CRUD application. We will only be storing events, i.e. things that have happened to our entity.
In the case of a Chat aggregate, this could be a series of events like:
- ChatCreated
- ChatMessageAdded
- ChatMessageEdited
- ChatMessageAdded
- ChatMessageDeleted
- etc
Using these events, we will be able to reconstruct the aggregate to its most recent state. This is called “rehydrating” an entity.
So, when we save an entity, we store the events that have happened to it. When we retrieve an entity, we create a new object and re-apply the events to it. This will become more clear in a future post on the domain object.
The Implementation
Let’s look at an actual implementation of such a repository.
Saving
First, the Save method:
public void Save(T eventSourced, string correlationId)
{
var events = eventSourced
.Events
.Select(x => Serialize(x, correlationId))
.ToList();
events.ForEach(e =>
{
_dbContext.Events.Add(e);
});
eventSourced.Events.ToList().ForEach(e =>
{
_eventBus.Publish(e, correlationId);
});
_dbContext.SaveChanges();
}
The steps we are doing here are as follows:
- get the unpersisted events from the aggregate
- add them to the DbContext (in case of Entity Framework)
- publish each event so the Event Handlers can do their work (more on that later)
- save all changes that have been made to the DbContext
Notice there is a Serialize method there too. This is what it looks like in my implementation:
private EventRecord Serialize(IVersionedEvent e, string correlationId) {
var versionedEventRecord = new EventRecord
{
AggregateId = e.SourceId,
AggregateType = typeof(T).Name,
Version = e.Version,
Payload = JsonConvert.SerializeObject(e, _jsonSerializerSettings),
CorrelationId = correlationId,
Timestamp = DateTime.UtcNow
};
return versionedEventRecord;
}
I’ll get to the IVersionedEvent in a future post. For now, imagine it contains some information on what happened. What’s important here is that we’re saving a record to a database table that contains the event (in the Payload property) and some meta data. Here’s the overview:
- AggregateId: the id of the entity where the event originated from
- AggregateType: the type of the entity where the event originated from
- Version: the version of the event (more on that later, it’s not the version of our source code, but rather a counter that points to the version of our entity)
- Payload: the event serialized to JSON (the settings include JSON.NET specifics like TypeNameHandling.All and FormatterAssemblyStyle.Simple)
- CorrelationId: as mentioned above, the id to correlate events to each other
- Timestamp: a simple timestamp to know when the event happened
Retrieving
Now that we know how we save the events, how do we retrieve an entity from the database? Keep in mind that we have one table with all events of all aggregates.
This is my implementation:
public T Find(Guid id)
{
var versionedEventRecords = _dbContext
.Events
.Where(e => e.AggregateType == typeof(T).Name
&& e.AggregateId == id)
.OrderBy(e => e.Version)
.Select(Deserialize)
.ToList();
if (versionedEventRecords.Any()) {
return _entityFactory.Invoke(id, versionedEventRecords);
}
return null;
}
Again, let’s go over the details.
First, I find all event records for the aggregate of the given type, with the given id.
Then I order them by version. This is important, so that we have the events in the order they happened. This means every event of an aggregate should have a unique, incrementing version number (even though the Timestamp could be the same for example). More on that later, when we discuss the Domain Object.
Next, I deserialize the records. Remember how we stored more than just the event in our database. We included some metadata too. To deserialize the record and only have our actual events, I use this piece of code (notice how I use the same JSON.NET settings):
private IVersionedEvent Deserialize(EventRecord e)
{
var deserializeObject = JsonConvert.DeserializeObject<IVersionedEvent>(e.Payload, _jsonSerializerSettings);
return deserializeObject;
}
The last step in our Find method is to create an entity and provide it with the deserialized events. I’ve done this with a factory method that is instantiated in the constructor of our repository like this:
var constructor = typeof(T).GetConstructor(new[] { typeof(Guid), typeof(IEnumerable<IVersionedEvent>) });
if (constructor == null)
{
throw new InvalidCastException($"Type {typeof(T)} must have a constructor with the following signature: .ctor(Guid, IEnumerable<IVersionedEvent>)");
}
_entityFactory = (id, events) => (T)constructor.Invoke(new object[] { id, events });
Remember that we made our repository generic, and only need one implementation. This is why. In short, when we (or our DI container) create a Repository<Chat>, it will try to find a constructor in the Chat class that take an IEnumerable of IVersionedEvent objects. When we call the Find method on our repository, it will invoke this constructor with the deserialized events.
How the Domain Object then rehydrates itself to the latest state is the subject of our next post.
The Repository was our first complex class in this series. The Command and Command Handlers were fairly simple. The Repository is there to take the events from the Domain Object and store them in a database. It should do so in such a way that it can later retrieve the relevant events and pass them to the Domain Object again.