When we talk about a domain model, the Aggregate takes the centerstage. An aggregate is a core abstraction that represents the time invariant part of the domain. It's an embodiment of all states that the aggregate can be in throughout its lifecycle in the system. So, it's extremely important that we take every pain to distil the domain model and protect the aggregate from all unwanted external references. Maybe an example will make it clearer.
Keeping the Aggregate pure
Consider a Trade model as the aggregate. By Trade, I mean a security trade that takes place in the stock exchange where counterparties exchange securities and currencies for settlement. If you're a regular reader of my blog, you must be aware of this, since this is almost exclusively the domain that I talk of in my blog posts.
A trade can be in various states like newly entered, value date added, enriched with tax and fee information, net trade value computed etc. In a trading application, as a trade passes through the processing pipeline, it moves from one state to another. The final state represents the complete Trade object which is ready to be settled between the counterparties.
In the traditional model of processing we have the final snapshot of the aggregate - what we don't have is the audit log of the actual state transitions that happened in response to the events. With event sourcing we record the state transitions as a pipeline of events which can be replayed any time to rollback or roll-forward to any state of our choice. Event sourcing is coming up as one of the potent ways to model a system and there are lots of blog posts being written to discuss about the various architectural strategies to implement an event sourced application.
That's ok. But whose responsibility is it to manage these state transitions and record the timeline of changes ? It's definitely not the responsibility of the aggregate. The aggregate is supposed to be a pure abstraction. We must design it as an immutable object that can respond to events and transform itself into the new state. In fact the aggregate implementation should not be aware of whether it's serving an event sourced architecture or not.
There are various ways you can model the states of an aggregate. One option that's frequently used involves algebraic data types. Model the various states as a sum type of products. In Scala we do this as case classes ..
sealed abstract class Trade { def account: Account def instrument: Instrument //.. } case class NewTrade(..) extends Trade { //.. } case class EnrichedTrade(..) extends Trade { //.. }
Another option may be to have one data type to model the Trade and model states as immutable enumerations with changes being effected on the aggregate as functional updates. No in place mutation, but use functional data structures like zippers or type lenses to create the transformed object in the new state. Here's an example where we create an enriched trade out of a newly created one ..
// closure that enriches a trade val enrichTrade: Trade => Trade = {trade => val taxes = for { taxFeeIds <- forTrade // get the tax/fee ids for a trade taxFeeValues <- taxFeeCalculate // calculate tax fee values } yield(taxFeeIds ° taxFeeValues) val t = taxFeeLens.set(trade, taxes(trade)) netAmountLens.set(t, t.taxFees.map(_.foldl(principal(t))((a, b) => a + b._2))) }
But then we come back to the same question - if the aggregate is distilled to model the core domain, who handles the events ? Someone needs to model the event changes, effect the state transitions and take the aggregate from one state to the next.
Enter Finite State Machines
In one of my projects I used the domain service layer to do this. The domain logic for effecting the changes lies with the aggregate, but they are invoked from the domain service in response to events when the aggregate reaches specific states. In other words I model the domain service as a finite state machine that manages the lifecycle of the aggregate.
In our example a Trading Service can be modeled as an FSM that controls the lifecycle of a
Trade
. As the following ..import TradeModel._ class TradeLifecycle(trade: Trade, timeout: Duration, log: Option[EventLog]) extends Actor with FSM[TradeState, Trade] { import FSM._ startWith(Created, trade) when(Created) { case Event(e@AddValueDate, data) => log.map(_.appendAsync(data.refNo, Created, Some(data), e)) val trd = addValueDate(data) notifyListeners(trd) goto(ValueDateAdded) using trd forMax(timeout) } when(ValueDateAdded) { case Event(StateTimeout, _) => stay case Event(e@EnrichTrade, data) => log.map(_.appendAsync(data.refNo, ValueDateAdded, None, e)) val trd = enrichTrade(data) notifyListeners(trd) goto(Enriched) using trd forMax(timeout) } when(Enriched) { case Event(StateTimeout, _) => stay case Event(e@SendOutContractNote, data) => log.map(_.appendAsync(data.refNo, Enriched, None, e)) sender ! data stop } initialize }
The snippet above contains a lot of other details which I did not have time to prune. It's actually part of the implementation of an event sourced trading application that uses asynchronous messaging (actors) as the backbone for event logging and reaching out to multiple consumers based on the CQRS paradigm.
Note that the FSM model above makes it very explicit about the states that the
Trade
model can reach and the events that it handles while in each of these states. Also we can use this FSM technique to log events (for event sourcing), notify listeners about the events (CQRS) in a very much declarative manner as implemented above.Let me know in the comments what are your views on this FSM approach towards handling state transitions in domain models. I think it helps keep aggregates pure and helps design domain services that focus on serving specific aggregate roots.
I will be talking about similar stuff, Akka actor based event sourcing implementations and functional domain models in PhillyETE 2012. Please drop by if this interests you.