More and more projects are using alternate forms of storage for persistence of the various forms of data that the application needs to handle. Of course relational databases have their very own place in this stack - the difference is that people today are not being pedantic about their use. And not using the RDBMS as the universal hammer for every nail that they see in the application.
Consider an application that needs durability for transactional data structures. I want to model a transactional banking system, basic debit credit operations, with a message based model. But the operations have to be persistent. The balance needs to be durable and all transactions need to be persisted on the disk. It doesn't matter what structures you store underneath - all I need is some key/value interface that allows me to store the transactions and the balances keyed by the transaction id. I don't even need to bother what form of storage I use at the backend. It can be any database, any key-value store, Terracotta or anything. Will you give me the flexibility to make the storage pluggable? Well, that's a bonus!
Enter Akka .. and its pluggable persistence layer that you can nicely marry to its message passing actor based interface. Consider the following messages for processing debit/credit operations ..
case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: Actor)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
case class Credit(accountNo: String, amount: BigInt)
case object LogSize
In the above messages, the
failer
actor is used to report fail operations in case the debit fails. Also we want to have all of the above operations as transactional, which we can make declaratively in Akka. Here's the basic actor definition ..class BankAccountActor extends Actor {
makeTransactionRequired
private val accountState =
TransactionalState.newPersistentMap(MongoStorageConfig())
private val txnLog =
TransactionalState.newPersistentVector(MongoStorageConfig())
//..
}
makeTransactionRequired
makes the actor transactionalaccountState
is a persistentMap
that plugs in to a MongoDB based storage, as is evident from the config parameter. In real life application, this will be further abstracted from a configuration file. Earlier I had blogged about the implementation of the MongoDB layer for Akka persistence.accountState
offers the key/value interface that will be used by the actor to maintain the durable snapshot of all balances.txnLog
is a persistent vector, once again backed up by a MongoDB storage and stores all the transaction logs that occurs in the system
Let us now look at the actor interface that does the message receive and process the debit/credit operations ..
class BankAccountActor extends Actor {
makeTransactionRequired
private val accountState =
TransactionalState.newPersistentMap(MongoStorageConfig())
private val txnLog =
TransactionalState.newPersistentVector(MongoStorageConfig())
def receive: PartialFunction[Any, Unit] = {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:" + accountNo)
reply(accountState.get(accountNo).get)
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
txnLog.add("Debit:" + accountNo + " " + amount)
val m: BigInt =
accountState.get(accountNo) match {
case None => 0
case Some(v) => {
val JsNumber(n) = v.asInstanceOf[JsValue]
BigInt(n.toString)
}
}
accountState.put(accountNo, (m - amount))
if (amount > m)
failer !! "Failure"
reply(m - amount)
//..
}
}
Here we have the implementation of two messages -
Balance
reports the current balance andDebit
does a debit operation on the balance
Note that the interfaces that these implementations use is in no way dependent on the MongoDB specific APIs. Akka offers a uniform key/value API set across all supported persistent storage. And each of the above pattern matched message processing fragments offer transaction semantics. This is pluggability!
Credit
looks very similar to Debit
. However, a more interesting use case is the MultiDebit
operation that offers a transactional interface. Just like your relational database's ACID semantics, the transactional semantics of Akka offers atomicity over this message. Either the whole MultiDebit
will pass or it will be rollbacked.class BankAccountActor extends Actor {
//..
def receive: PartialFunction[Any, Unit] = {
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))
val m: BigInt =
accountState.get(accountNo) match {
case None => 0
case Some(v) => BigInt(v.asInstanceOf[String])
}
var bal: BigInt = 0
amounts.foreach {amount =>
bal = bal + amount
accountState.put(accountNo, (m - bal))
}
if (bal > m) failer !! "Failure"
reply(m - bal)
//..
}
}
Now that we have the implementation in place, let's look at the test cases that exercise them ..
First a successful debit test case. Note how we have a separate failer actor that reports failure of operations to the caller.
@Test
def testSuccessfulDebit = {
val bactor = new BankAccountActor
bactor.start
val failer = new PersistentFailerActor
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
val JsNumber(n) = b
assertEquals(BigInt(2000), BigInt(n.toString))
bactor !! Credit("a-123", 7000)
val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
val JsNumber(n1) = b1
assertEquals(BigInt(9000), BigInt(n1.toString))
bactor !! Debit("a-123", 8000, failer)
val b2 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
val JsNumber(n2) = b2
assertEquals(BigInt(1000), BigInt(n2.toString))
assertEquals(7, (bactor !! LogSize).get)
}
And now the interesting
MultiDebit
that illustrates the transaction rollback semantics ..@Test
def testUnsuccessfulMultiDebit = {
val bactor = new BankAccountActor
bactor.start
bactor !! Credit("a-123", 5000)
val b = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
val JsNumber(n) = b
assertEquals(BigInt(5000), BigInt(n.toString))
val failer = new PersistentFailerActor
failer.start
try {
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
fail("should throw exception")
} catch { case e: RuntimeException => {}}
val b1 = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
val JsNumber(n1) = b1
assertEquals(BigInt(5000), BigInt(n1.toString))
// should not count the failed one
assertEquals(3, (bactor !! LogSize).get)
}
In the snippet above, the balance remains at 5000 when the debit fails while processing the final amount of the list passed to
MultiDebit
message.Relational database will always remain for the use case that it serves the best - persistence of data that needs a true relational model. NoSQL is gradually making its place in the application stack for the complementary set of use cases that need a much loosely coupled model, a key/value database or a document oriented database. Apart from easier manageability, another big advantage using these databases is that they do not need big ceremonious ORM layers between the application model and the data model. This is because what you see is what you store (WYSIWYS), there is no paradigm mismatch that needs to be bridged.
2 comments:
Interesting. But why isn't
case Some(v) => {
val JsNumber(n) = v.asInstanceOf[JsValue]
BigInt(n.toString)
}
reduced to
case Some(JsNumber(n)) =>
BigInt(n.toString)
?
Just stumbled across this while looking at Akka for STM. It looks like the hidden magic you're not showing here is that your 'failer' Actor is throwing a RuntimeException and that's what rolls back the transaction. Is that right? That doesn't seem very scala-ish. Isn't there a more explicit commit/rollback structure?
Post a Comment