Thursday, August 22, 2013

Multi-threading through ExecutorService or ExecutorCompletionService from java.util.concurrent package



java.util.concurrent package has lot of utility Classes/Interfaces which can be used to create good multi-threading programs. Lot of boiler plate code like creation of Threads and maintenance of the same has be taken take by these classes.

Below code explains the 2 different ways of achieving the same. All the classes are from java.util.concurrent package.

The scope of an individual task here is to wait for 2 seconds and come out.
And 10 such tasks should be executed.

singleThreadWay()is a version of single threaded model which takes ~20 seconds [10 * 2]

multiThreadedWayWithExecutorService() is a version of multi-threaded model using ExecutorService which takes ~2 seconds!
ExecutorService has different methods to support Runnable (which doesn’t return any response) and Callable (which return response).
·         execute(Runnable)
·         submit(Runnable)
·         submit(Callable)
·         invokeAny(...)
·         invokeAll(...)

One can create ExecutorService depends on the implementation you use. However, you can use the Executors factory class to create ExecutorService instances too. Here are a few examples of creating an ExecutorService:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);

The ExecutorService’s submit method used in the code puts the task and exits. It doesn’t wait for the response to come. It just returns the handle to the response in the form of Future.
The actual response can be retried from issuing get method of Future.

multiThreadedWayWithExecutorCompletionService() is a version of multi-threaded model using ExecutorCompletionService which takes ~2 seconds!
This CompletionService is a wrapper around ExecutorService.
It also gives the handle on Future for response. But there is a better way to get the response i.e. the CompletionService has a method take() which returns Future again but flushes the internal queue. Hence a better approach to read the response.
The other important aspect of CompletionService is the responses are accumulated as when they are available. More explanation is found in next section.


public class MultiThreadingExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        singleThreadWay();
        multiThreadedWayWithExecutorService();
        multiThreadedWayWithExecutorCompletionService();
        long endTime = System.currentTimeMillis();
       
        System.out.println("Total time taken by process is " + (endTime - startTime));
    }

    private static void singleThreadWay() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(2000l);
            System.out.println("Asynchronous task");
        }
    }

    private static void multiThreadedWayWithExecutorService() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future<?>> futures = new ArrayList<Future<?>>();

        for (int i=0; i<10; i++) {
            futures.add
            (executorService.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    System.out.println("Asynchronous task");
                    Thread.sleep(2000l);
                    return "I'm done";
                }
            }));
        }
       
        for(Future<?> f:futures) {
            System.out.println(f.get());
        }
       
        executorService.shutdown();
    }
   
    private static void multiThreadedWayWithExecutorCompletionService() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletionService<Object> completionService = new ExecutorCompletionService<Object>(executorService);
       
        for (int i=0; i<10; i++) {
            completionService.submit(new Callable<Object>() {
                public Object call() throws Exception {
                    System.out.println("Asynchronous task");
                    Thread.sleep(2000l);
                    return "I'm done";
                }
            });
        }
       
        for (int i=0; i<10; i++) {
            System.out.println(completionService.take().get());
        }
       
        executorService.shutdown();
    }
}

If the ExecutorService is Spring injected, below is the syntax:
    <bean id="myExecutor" class="java.util.concurrent.Executors"
        factory-method="newFixedThreadPool" destroy-method="shutdownNow">
        <constructor-arg value="10"/>
    </bean>

ExecutorService vs ExecutorCompletionService:

Suppose you had a set of tasks A, B, C, D, E and you want to execute each of them asynchronously in an Executor and process the results 1 by 1 as they complete.

With an Executor, you would do so like this:
List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
futures.add(executorService.submit(E));

//This loop must process the tasks in the order they were submitted: A, B, C, D, E
for (Future<?> future:futures) {
    ? result = future.get();
    // Some processing here
}

The problem with this method is that there is no guarantee that task A will complete first. Thus, it is possible that the main thread will be blocking idly waiting for task A to complete when it could be processing the result of another task (say task B). Result processing latency could be reduced by using an ExecutorCompletionService.

List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
futures.add(executorCompletionService.submit(E));
 
//This for loop will process the tasks in the order they are completed, regardless of submission order
for (int i=0; i<futures.size(); i++) {
    ? result = executorCompletionService.take().get();
    // Some processing here
}

So, in essence, ExecutorCompletionService could be used to squeeze out a little more efficiency when the order of processing task results does not matter.
One important thing to note though. The implementation of ExecutorCompletionService contains a queue of results. If take or poll are not called to drain that queue, a memory leak will occur. Some people use the Future returned by submit to process results and this is NOT correct usage.

The other explanation of the difference from javadoc:
I think the javadoc best answers the question of when the CompletionService is useful in a way an ExecutorService isn't.
A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.
Basically, this interface allows a program to have producers which create and submit tasks (and even examine the results of those submissions) without knowing about any other consumers of the results of those tasks. Meanwhile, consumers which are aware of the CompletionService could poll for or take results without being aware of the producers submitting the tasks.

Another good example found in javadoc below:
A CompletionService that uses a supplied Executor to execute tasks. This class arranges that submitted tasks are, upon completion, placed on a queue accessible using take. The class is lightweight enough to be suitable for transient use when processing groups of tasks.
Usage Examples. Suppose you have a set of solvers for a certain problem, each returning a value of some type Result, and would like to run them concurrently, processing the results of each of them that return a non-null value, in some method use(Result r). You could write this as:
    void solve(Executor e, Collection<Callable<Result>> solvers)
      throws InterruptedException, ExecutionException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers)
            ecs.submit(s);
        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            Result r = ecs.take().get();
            if (r != null)
                use(r);
        }
    }

Suppose instead that you would like to use the first non-null result of the set of tasks, ignoring any that encounter exceptions, and cancelling all other tasks when the first one is ready:
    void solve(Executor e, Collection<Callable<Result>> solvers)
      throws InterruptedException {
        CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers)
                futures.add(ecs.submit(s));
            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch(ExecutionException ignore) {}
            }
        }
        finally {
            for (Future<Result> f : futures)
                f.cancel(true);
        }

        if (result != null)
            use(result);
    }

shutdown()/shoutdownNow()
When you are done using the ExecutorService you should shut it down, so the threads do not keep running.
For instance, if your application is started via a main() method and your main thread exits your application, the application will keep running if you have an active ExexutorService in your application. The active threads inside this ExecutorService prevents the JVM from shutting down.
To terminate the threads inside the ExecutorService you call its shutdown() method. The ExecutorService will not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down. All tasks submitted to the ExecutorService before shutdown() is called, are executed.
If you want to shut down the ExecutorService immediately, you can call the shutdownNow() method. This will attempt to stop all executing tasks right away, and skips all submitted but non-processed tasks. There are no guarantees given about the executing tasks. Perhaps they stop, perhaps the execute until the end. It is a best effort attempt.
References:



No comments:

Post a Comment