DDD – Aggregate Roots and Domain Events publication

Implementing DDD in a traditional object-oriented style *, when an action over an Aggregate succeeds the state of the aggregate changes and one or multiple domain events are published.
A Domain Event represents something that has happened in the domain. In the other hand if the action cannot be fulfilled a Domain Error is thrown as an exception by the aggregate.

Publishing Domain Events from an Aggregate
The question that arises here is Should the aggregates publish events directly?. We can find a multitude of articles and examples where the Aggregate implementation does exactly that. The very same Vaughn Vernon’s IDDD book shows this approach (page 300):

public class BacklogItem extends ConcurrencySafeEntity {
  ...
  public void commitTo(Sprint aSprint) {
  ...
    DomainEventPublisher
      .instance()
      .publish(new BacklogItemCommitted(
         this.tenantId(),
         this.backlogItemId(),
         this.sprintId()));
    }
  ...
}

the BacklogItem Aggregate creates and dispatches an event directly using a DomainEventPublisher instance before the database transaction is commited.

This approach can become quite complex to implement right, because of the transactional concerns. Think about it, if the aggregate publishes events directly, what happens if later on the persistence operation fails?
Unit testing of aggregates gets also complicated, the aggregate is coupled to a DomainEventPublisher that it instantiates directly. At the same time, we know that we should avoid dependency injection into an Aggregate.

An alternative approach is to use a Two-steps event publication

  • 1st) Aggregate root creates and registers domain events for publication.
  • 2nd) When the application Service persists the changes to the aggregate (calling the save method in a Repository) the infrastructure takes care of publishing the registered events only if the save transaction succeeds.

Let’s see this in action with some code. The domain example is for a Runbook which tracks Tasks, and the use case is assigning a task to an operator.

– Service retrieves and acts on the aggregate.

// RunbookService
fun assignTask(c: AssignTask) {
    val runbook = runbookRepository.findById(c.runbookId)

    runbook.assignTask(c.taskId, c.assigneeId, c.userId) // act on the aggregate
    ...

– Aggregate changes its state and then creates and registers domain events.

// Runbook
fun assignTask(taskId: String, assigneeId: String, userId: String) {
    validateIsOwner(userId)

    getTask(taskId).assign(assigneeId)

    registerEvent(TaskAssigned(runbookId, taskId, assigneeId)) // create and register event
}

Notice that Domain Events are only registered by the aggregate root because it is the one ensuring the integrity of the aggregate as a whole.

The way that the aggregate “registers” events is keeping track of them internally as a list of unpublished domain events.

...
@Transient
private val domainEvents = ArrayList()

protected fun registerEvent(event: EventType) {
    this.domainEvents.add(event)
}
...

– Service calls repository to save changes
After calling the assignTask action on the aggregate the service saves the changes (the “changed” aggregate) using the repository.

// RunbookService
fun assignTask(c: AssignTask) {
    val runbook = runbookRepository.findById(c.runbookId)

    runbook.assignTask(c.taskId, c.assigneeId, c.userId)

    runbookRepository.save(runbook) // save aggregate
}

– Infrastructure publishes all the aggregate unpublished domain events.
This can be done directly with a custom repository implementation and some form of internal pub-subscribe mechanism or with the help of a framework. Later on, I describe how to do this for repositories based on Spring Data.

Unit Testing
Unit testing of the aggregate remains simple with this design, there is no need to mock an event publisher or anything else. Asserting that the aggregate created and registered the right events can be done directly checking the aggregate domain events collection.

    @Test
    fun `can assign task`() {
        val runbook = Runbook(RUNBOOK_ID, RUNBOOK_NAME, OWNER_ID)
        runbook.addTask(TASK_ID, TASK_NAME, TASK_DESCRIPTION)

        runbook.assignTask(TASK_ID, TASK_ASSIGNEE_ID, OWNER_ID)

        assertThat(runbook.domainEvents()).contains(TaskAssigned(RUNBOOK_ID, TASK_ID, TASK_ASSIGNEE_ID))
    }

Domain Events
Here is an example of how the Domain Events for an aggregate look using Kotlin sealed classes.

sealed class RunbookEvent

data class TaskAdded(
        val runbookId: String,
        val taskId: String) : RunbookEvent()

data class TaskAssigned(
        val runbookId: String,
        val taskId: String,
        val assigneeId: String) : RunbookEvent()
...

Using Spring Data infrastructure to Publishing Events from Aggregate Roots
Spring Data repositories have support for aggregate domain events publication. On a call to a repository save() method, Spring will look into the passed aggregate for any unpublish domain events and will automatically publish them via its ApplicationEventPublisher API.

To expose the events the aggregate root needs to provide an accessor method annotated with the @DomainEvents annotation.

...
@Transient
private val domainEvents = ArrayList()
 
@DomainEvents
fun domainEvents(): Collection<EventType> {
    return Collections.unmodifiableList<EventType>(domainEvents)
}
...

Aggregate Root base class
We can reuse across all aggregates the same code for registering events placing it in an AggregateRoot base class.

abstract class AggregateRoot<EventType> {

    @Transient
    private val domainEvents = ArrayList<EventType>()

    /**
     * All domain events currently captured by the aggregate.
     */
    @DomainEvents
    fun domainEvents(): Collection<EventType> {
        return Collections.unmodifiableList<EventType>(domainEvents)
    }

    /**
     * Registers the given event for publication.
     */
    protected fun registerEvent(event: EventType): EventType {
        this.domainEvents.add(event)
        return event
    }

    /**
     * Clears all domain events currently held. Invoked by the infrastructure in place in Spring Data
     * repositories.
     */
    @AfterDomainEventPublication
    protected fun clearDomainEvents() {
        this.domainEvents.clear()
    }
}

Conclusion
Domain Events make explicit what changes within a domain. Use them not only to have a richer model but also a better design, implementation and tests. The different parts of the system can be decoupled much easily and prevent us from having large application services or aggregates.
It also simplifies future integration with other systems, translation from Domain Events to messaging Integration Events is trivial.

References
– Implementing Domain Driven Design, Vaughn Vernon.
– A better domain events pattern, Jimmy Bogard. https://lostechies.com/jimmybogard/2014/05/13/a-better-domain-events-pattern
– Spring – Domain event publication from aggregate roots, Mark Paluch. https://spring.io/blog/2017/01/30/what-s-new-in-spring-data-release-ingalls#domain-event-publication-from-aggregate-roots
– Domain Events vs. Integration Events in Domain-Driven Design and microservices architectures, Cesar de la Torre
https://blogs.msdn.microsoft.com/cesardelatorre/2017/02/07/domain-events-vs-integration-events-in-domain-driven-design-and-microservices-architectures/
– Domain events: design and implementation. https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/microservice-ddd-cqrs-patterns/domain-events-design-implementation

1 – In a functional DDD style the aggregates: do not mutate state; do not publish or register events; do not throw exceptions. Instead, they return a result type containing the new version of the aggregate and domain events or in case of failure, the result contains the domain errors.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.