Wednesday, November 22, 2006

Linear Time Sort Puzzler

Interviewing for an engineering position at Google can be grueling and humbling. You get less than an hour with each interviewer, and may get one technical problem after another to solve. If you have performance anxiety you may have a hard time focusing on the problem. The best approach, if you can do it, is to think of the interview as a technically challenging but friendly brainstorming session. If you get hired, that's what your life will be like.

My friend, Jim Davis, told me about someone he interviewed earlier this week. The candidate thought the interview wasn't going well and hoped to impress Jim by sharing an idea he had: a new algorithm for sorting in linear time. It isn't a sort based exclusively on comparisons; it can be proven that any such sort must perform O(n log n) comparisons. It isn't a bucket sort, which is a standard algorithm for this problem (assuming certain characteristics of the input set). But the candidate's algorithm helped Jim understand the candidate's capabilities.

The algorithm works like this: you start by making a linear scan through the n numbers to be sorted and note the largest and smallest of the elements. Using that, you compute a linear function f(x) that will map the smallest input element to zero and the largest to n. Then, for each input element xi, you fork a task that waits f(xi) milliseconds and then outputs xi. After n milliseconds the output contains the input elements in sorted order.

There may be problems when two tasks attempt to output their data at nearly the same time, but let's ignore that for a moment and assume an "ideal" system. If you have a truly concurrent computer, this may run in O(n) time using O(n) processors, thereby consuming a total of O(n2) processing time. There are far better concurrent sorting algorithms around. Much more interesting is the idea of running this algorithm on a sequential computer with a non-preemptive scheduler. The non-preemptive scheduler avoids the problem of two tasks producing data at the same time. The scheduler can also ensure that the tasks run in their proper order, even if the running time of some tasks causes others to start slightly "too late".

Jim explained the problem with the algorithm, concluding "so you've reduced the problem of sorting in linear time to a different linear time sorting algorithm." After Jim explained this remark, the candidate replied "well, the idea isn't really very well thought out." Here is the puzzle: what was Jim talking about?

Monday, November 20, 2006

A Thread Pool Puzzler

I participated in the design and development of a couple of concurrency libraries for shared-memory multiprocessors long before such machines were popular. So when I started using java.util.concurrent I was already somewhat comfortable with the concepts. But when I used it more intensely for production work in the Google Calendar server, I ran into a couple of "gotcha" situations. I'd like to tell you about one in particular, in part because it might help you avoid the problem yourself, and in part because I believe this issue exposes some missing functionality in the concurrency framework.

Many parallel programming problems can be expressed using fork-join parallelism, in which tasks spawn, or "fork", a number of subtasks that can be executed in parallel. The caller then waits for these subtasks to complete by "join"ing with them. Consider the following sequential program. It is an abstract model of some larger program that has three logical layers.

class Program {
    static final int N = 3;
    public static void main(String[] args) {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerOne(); }
            });
        System.out.println();
    }

    static void doLayerOne() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerTwo(); }
            });
    }

    static void doLayerTwo() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerThree(); }
            });
    }

    static void doLayerThree() {
        doSomeWork();
    }
    static void loopNtimes(int n, Runnable runnable) {
        for (int i=0; i<n; i++) runnable.run();
    }
    static void doSomeWork() {
        System.out.print(".");
        try { Thread.sleep(500L); } catch (InterruptedException _) {}
    }
}

This program prints 40 dots, taking a half second for each one. It runs to completion in about 20 seconds. Let's rewrite the loops as concurrent (instead of sequential) loops, using an idiom recommended by Martin Buchholz. To do that we replace the method loopNtimes with the following:

    static ExecutorService threadPool = Executors.newCachedThreadPool();
      static void loopNtimes(int n, Runnable runnable) {
        Collection<Callable<Object>> c = new ArrayList<Callable<Object>>();
        for (int i=0; i<n; i++) c.add(Executors.callable(runnable));
        Collection<Future<Object>> futures = null;
        try { futures = threadPool.invokeAll(c); } catch (InterruptedException _) {}
        if (futures != null) for (Future<Object> f : futures) {
            try { f.get(); }
            catch (InterruptedException ex) {}
            catch (ExecutionException ex) {
                ex.printStackTrace();
                System.exit(1);
            }
        }
    }

This requires a couple of other minor changes to the program (two import statements and System.exit(0) at the end of main), but the program now runs in two seconds instead of twenty. So far so good, but if N is larger, say a hundred, this program fails. It throws OutOfMemoryError becuase it tries to allocate too many threads. My first attempt to fix this replaced the thread pool by one containing a fixed number of threads:

    static ExecutorService threadPool = Executors.newFixedThreadPool(100);

This version of the program works and runs in 2 seconds. But why should we use 100 threads? If we imagine that the Thread.sleep statements represent computationally intensive parts of the program, it might make more sense to have a number of threads approximately the same as the number of physical processors. I'm running this on a machine with an Intel Cetrino Duo processor, which acts roughly like 2 processors. Let's be generous, however, and make ten threads. So we modify this version of the program by changing 100 to 10. That won't be as fast as the version with 100 threads, but just how fast will it be?

If you haven't guessed the punch line by now, I'll tell you: with ten threads in the pool the program prints 11 periods and then deadlocks! If you use a debugger to examine the state of the program to figure out what's going on, you'll find the main thread waiting for invokeAll, three threads in doLayerOne waiting for invokeAll, seven threads in doLayerTwo waiting for invokeAll, and there are no threads left to do any of the work of calling doLayerThree. This is a classic thread starvation deadlock.

If you're just trying out this program to see what happens, you might be slightly annoyed and finally give up and hit control-C to quit the Java program, but when our program (Google Calendar) encounters this kind of problem our customers get annoyed, give up, and sign up for a competitor like Yahoo Calendar or 30Boxes. Hey, don't click those links; trust me, you really want Google Calendar. My point is that we can't leave this to chance.

What can or should we do about this problem? The first idea is to change the 10 back into 100, but those numbers are pulled out of thin air. Without analyzing the behavior and interaction of all the places where the thread pool is used, understanding the dynamic performance of the application under real loads, and placing bounds on the number of tasks that will be used at each level in the program's hierarchy, it is difficult or impossible to pick a number that will always avoid this kind of deadlock. Another idea is to use unbounded thread pools, but as we've seen under high load situations those can cause an explosion in the number of threads, resulting in the program failing by running out of memory.

What we did to address this issue is avoid the single monolithic thread pool altogether. Instead, we use a separate thread pool at every level in the hierarchy. In terms of this example, we would have a thread pool for use in main, one for use in doLayerOne, and one for use in doLayerTwo. Every subsystem that requires concurrency gets its own personal thread pool. That way every layer that uses concurrency is guaranteed to make progress when it has work to do, so this kind of deadlock cannot occur. But there is a cost to this as well: balancing the sizes of these thread pools is a black art. During operation we have hundreds of threads, most of which are sitting around doing nothing. Besides being a waste of resources, the generous surplus of "extra" threads make debugging more difficult than it should be. If the system doesn't break down so neatly into layers (perhaps because there are recursive loops in the call cycle of the subsystems) then even this solution can break down and result in thread starvation.

The situation is not entirely hopeless. In my opinion, this kind of thread starvation should never occur because there is always one thread that can contribute processing power toward execution the subtasks: the thread that is waiting for the subtasks to complete. Here's the implementation of invokeAll as it appears in the JDK:

    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

This code does not use the current thread to do any of the work of invoking the callables. Below is a slightly modified version (I've added a line to the original and refactored it to make it a static method that we can put in the program) that uses the current thread to do any work that another thread hasn't already started. I've highlighted the newly added code:

    public static <T> List<Future<T>> invokeAll(
            ExecutorService threadPool, Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                threadPool.execute(f);
            }
            // force unstarted futures to execute using the current thread
            for (Future<T> f : futures) ((FutureTask)f).run();
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

Using this version of invokeAll, the program does not experience thread starvation. If the thread pool is reduced in size to just one thread, the program runs to completion in about 11 seconds, because two threads are contributing to doing the work (the main thread and the thread from the pool).

I discussed this issue with Doug Lea, and he warned me that selecting tasks for efficient scheduling in a fork-join concurrency framework is not trivial; the standard solution is to have a double-ended queue for each worker task where it enqueues its subtasks. The worker removes the most recently generated task from this queue for itself to process, thereby simulating a depth-first execution strategy in the single-thread case. When a worker finds itself without any work to do, it steals work from the other end of the queue of another task. That is, it should steal one of the least-recently created (course-grained) subtasks. In addition, it is beneficial to have a mechanism to avoid the queue altogether for the bottom of the call chain. Doug told me that this strategy was pioneered by the Cilk work, but I first learned about this strategy 10 years earlier reading WorkCrews: An Abstraction for Controlling Parallelism by Mark T. Vandervoorde and Eric S. Roberts. My implementation provides exactly this behavior but with a much simpler implementation. The invocation of run executes one of the tasks most recently generated by the current thread. When a thread has no more work to do, it removes work from the queue of the underlying ExecutorService, which is a FIFO queue, and so it takes the least-recently generated task of all workers. On the other hand, because this implementation shares a single queue among all worker threads, there may be additional synchronization overhead compared to the WorkCrews/Cilk solution.

It is possible to use the existing concurrency utilities to work around the problem, if you don't mind the task scheduling being far from optimal. You can do that by setting CallerRuns policy on a ThreadPoolExecutor, and using a synchronous queue:

static ThreadPoolExecutor threadPool =
  new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>());
static {
    threadPool.setRejectedExecutionHandler(
        new ThreadPoolExecutor.CallerRunsPolicy());
}

Doug explained to me that the earlier public-domain version of the concurrency utilities had a full implementation of a framework for fork-join parallelism, but they didn't get included in JDK5:

"... The vast majority of such usages are nicest to support as "loop parallelism" utilities. And it is not so much that utilities based on FJ tasks are inconvenient to use that has kept them out, but instead uncertainty about basic APIs until closures are settled. All of the methods for aggregate operations on collections and arrays (applyToAll, map, reduce, mapReduce, findAny, findAll, etc) require function-type arguments (perhaps along with some sort of purity annotation as might be introduced via JSR305) that, depending on how things work out otherwise, would need to be introduced more generally."

Did you think I would get through an entire blog post without mentioning Closures?