Showing posts with label redis. Show all posts
Showing posts with label redis. Show all posts

Wednesday, July 24, 2013

Scala Redis client goes non blocking : uses Akka IO

scala-redis is getting a new non blocking version based on a kernel implemented with the new Akka IO. The result is that all APIs are non blocking and return a Future. We are trying to keep the API as close to the blocking version as possible. But bits rot and some of the technical debt need to be repaid. We have cleaned up some of the return types which had unnecessary Option[] wrappers, made some improvements and standardizations on the API type signatures and also working on making the byte array manipulation faster using akka.util.ByteString at the implementation level. We also have plans of using the Akka IO pipeline for abstracting the various stages of handling Redis protocol request and response.

As of today we have quite a bit ready for review by the users. The APIs may change a bit here and there, but the core APIs are up there. There are a few areas which have not yet been implemented like PubSub or clustering. Stay tuned for more updates on this blog .. Here are a few code snippets that demonstrate the usage of the APIs ..

Non blocking get/set

@volatile var callbackExecuted = false

val ks = (1 to 10).map(i => s"client_key_$i")
val kvs = ks.zip(1 to 10)

val sets: Seq[Future[Boolean]] = kvs map {
  case (k, v) => client.set(k, v)
}

val setResult = Future.sequence(sets) map { r: Seq[Boolean] =>
  callbackExecuted = true
  r
}

callbackExecuted should be (false)
setResult.futureValue should contain only (true)
callbackExecuted should be (true)

callbackExecuted = false
val gets: Seq[Future[Option[Long]]] = ks.map { k => client.get[Long](k) }
val getResult = Future.sequence(gets).map { rs =>
  callbackExecuted = true
  rs.flatten.sum
}

callbackExecuted should be (false)
getResult.futureValue should equal (55)
callbackExecuted should be (true)

Composing through sequential combinators

val key = "client_key_seq"
val values = (1 to 100).toList
val pushResult = client.lpush(key, 0, values:_*)
val getResult = client.lrange[Long](key, 0, -1)

val res = for {
  p <- pushResult.mapTo[Long]
  if p > 0
  r <- getResult.mapTo[List[Long]]
} yield (p, r)

val (count, list) = res.futureValue
count should equal (101)
list.reverse should equal (0 to 100)

Error handling using Promise Failure

val key = "client_err"
val v = client.set(key, "value200")
v.futureValue should be (true)

val x = client.lpush(key, 1200)
val thrown = evaluating { x.futureValue } should produce [TestFailedException]
thrown.getCause.getMessage should equal ("ERR Operation against a key holding the wrong kind of value")
Feedbacks welcome, especially on the APIs and their usage. All code are in Github with all tests in the test folder. Jisoo Park (@guersam) has been doing an awesome job contributing a lot to all the goodness that's there in the repo. Thanks a lot for all the help ..

Tuesday, September 27, 2011

Non blocking composition using Redis and Futures

scala-redis now supports pooling of Redis clients. Using RedisClientPool you can do some cool stuff in non blocking mode and get an improved throughput for your application.

Suppose you have a bunch of operations that you can theoretically execute in parallel, maybe a few disjoint list operations and a few operations on key/values .. like the following snippets ..

val clients = new RedisClientPool("localhost", 6379)

// left push to a list
def lp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.lpush("list-l", _))
    client.llen("list-l")
  }}
}

// right push to another list
def rp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.rpush("list-r", _))
    client.llen("list-r")
  }}
}

// key/set operations
def set(msgs: List[String]) = {
  clients.withClient {client => {
    var i = 0
    msgs.foreach { v =>
      client.set("key-%d".format(i), v)
      i += 1
    }
    Some(1000) // some dummy
  }}
}

Redis, being single threaded, you can use client pooling to allocate multiple clients and fork these operations concurrently .. Here's a snippet that does these operations asynchronously using Scala futures ..

// generate some arbitrary values
val l = (0 until 5000).map(_.toString).toList

// prepare the list of functions to invoke
val fns = List[List[String] => Option[Int]](lp, rp, set)

// schedule the futures
val tasks = fns map (fn => scala.actors.Futures.future { fn(l) })

// wait for results
val results = tasks map (future => future.apply())

And while we are on this topic of using futures for non blocking redis operations, Twitter has a cool library finagle that offers lots of cool composition stuff on Futures and other non blocking RPC mechanisms. Over the weekend I used some of them to implement scatter/gather algorithms over Redis. I am not going into the details of what I did, but here's a sample dummy example of stuffs you can do with RedisConnectionPool and Future implementation of Finagle ..

The essential idea is to be able to compose futures and write non blocking code all the way down. This is made possible through monadic non-blocking map and flatMap operations and a host of other utility functions that use them. Here's an example ..

def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] = { //..

It uses flatMap and map to collect the results from the given list of futures into a new future of Seq[A].

Let's have a look at a specific example where we push a number of elements into 100 lists concurrently using a pool of futures, backed by ExecutorService. This is the scatter phase of the algorithm. The function listPush actually does the push using a RedisConnectionPool and each of these operations is done within a Future. FuturePool gives you a Future where you can specify timeouts and exception handlers using Scala closures.

Note how we use the combinator collect for concurrent composition of the futures. The resulting future that collect returns will be complete when all the underlying futures have completed.

After the scatter phase we prepare for the gather phase by pipelining the future computation using flatMap. Unlike collect, flatMap is a combinator for sequential composition. In the following snippet, once allPushes completes, the result pipelines into the following closure that generates another Future. The whole operation completes only when we have both the futures completed. Or we have an exception in either of them.

For more details on how to use these combinators on Future abstractions, have a look at the tutorial that the Twitter guys published recently.

implicit val timer = new JavaTimer

// set up Executors
val futures = FuturePool(Executors.newFixedThreadPool(8))

// abstracting the flow with future
private[this] def flow[A](noOfRecipients: Int, opsPerClient: Int, fn: (Int, String) => A) = {
  val fs = (1 to noOfRecipients) map {i => 
    futures {
      fn(opsPerClient, "list_" + i)
    }.within(40.seconds) handle {
      case _: TimeoutException => null.asInstanceOf[A]
    }
  }
  Future.collect(fs)
}

// scatter across clients and gather them to do a sum
def scatterGatherWithList(opsPerClient: Int)(implicit clients: RedisClientPool) = {
  // scatter
  val allPushes: Future[Seq[String]] = flow(100, opsPerClient, listPush)
  val allSum = allPushes flatMap {result =>
    // gather
    val allPops: Future[Seq[Long]] = flow(100, opsPerClient, listPop)
    allPops map {members => members.sum}
  }
  allSum.apply
}

For the complete example implementations of these patterns like scatter/gather using Redis, have a look at the github repo for scala-redis.