java.util.concurrent
package has lot of utility Classes/Interfaces which can be used to create good
multi-threading programs. Lot of boiler plate code like creation of Threads and
maintenance of the same has be taken take by these classes.
Below code
explains the 2 different ways of achieving the same. All the classes are from java.util.concurrent
package.
The scope of an individual task here
is to wait for 2 seconds and come out.
And 10 such tasks should be executed.
singleThreadWay()is
a version of single threaded model which takes ~20 seconds [10 * 2]
multiThreadedWayWithExecutorService() is a version of multi-threaded model
using ExecutorService which takes ~2
seconds!
ExecutorService has different methods
to support Runnable (which doesn’t return any response) and Callable (which
return response).
·
execute(Runnable)
·
submit(Runnable)
·
submit(Callable)
·
invokeAny(...)
·
invokeAll(...)
One can create
ExecutorService
depends on the implementation you use.
However, you can use the Executors
factory class to
create ExecutorService
instances too. Here
are a few examples of creating an ExecutorService
:ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
The ExecutorService’s submit method used
in the code puts the task and exits. It doesn’t wait for the response to come. It
just returns the handle to the response in the form of Future.
The actual response can be retried from
issuing get method of Future.
multiThreadedWayWithExecutorCompletionService() is a version of multi-threaded
model using ExecutorCompletionService
which takes ~2 seconds!
This CompletionService is a wrapper
around ExecutorService.
It also gives the handle on Future
for response. But there is a better way to get the response i.e. the CompletionService
has a method take() which returns Future again but flushes the internal queue. Hence
a better approach to read the response.
The other important aspect of
CompletionService is the responses are accumulated as when they are available. More
explanation is found in next section.
public class MultiThreadingExample
{
public static void main(String[]
args) throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
singleThreadWay();
multiThreadedWayWithExecutorService();
multiThreadedWayWithExecutorCompletionService();
long endTime = System.currentTimeMillis();
System.out.println("Total
time taken by process is " + (endTime - startTime));
}
private static void
singleThreadWay() throws InterruptedException {
for (int i = 0; i <
10; i++) {
Thread.sleep(2000l);
System.out.println("Asynchronous
task");
}
}
private static void
multiThreadedWayWithExecutorService() throws
InterruptedException, ExecutionException {
ExecutorService executorService =
Executors.newFixedThreadPool(10);
List<Future<?>> futures = new
ArrayList<Future<?>>();
for (int i=0; i<10;
i++) {
futures.add
(executorService.submit(new
Callable<Object>() {
public Object call() throws Exception {
System.out.println("Asynchronous
task");
Thread.sleep(2000l);
return "I'm
done";
}
}));
}
for(Future<?> f:futures) {
System.out.println(f.get());
}
executorService.shutdown();
}
private static void
multiThreadedWayWithExecutorCompletionService() throws
InterruptedException, ExecutionException {
ExecutorService executorService =
Executors.newFixedThreadPool(10);
CompletionService<Object>
completionService = new ExecutorCompletionService<Object>(executorService);
for (int i=0; i<10;
i++) {
completionService.submit(new
Callable<Object>() {
public Object call() throws Exception {
System.out.println("Asynchronous
task");
Thread.sleep(2000l);
return "I'm
done";
}
});
}
for (int i=0; i<10;
i++) {
System.out.println(completionService.take().get());
}
executorService.shutdown();
}
}
If the
ExecutorService is Spring injected, below is the syntax:
<bean id="myExecutor"
class="java.util.concurrent.Executors"
factory-method="newFixedThreadPool"
destroy-method="shutdownNow">
<constructor-arg value="10"/>
</bean>
ExecutorService vs ExecutorCompletionService:
Suppose
you had a set of tasks A, B, C, D, E and you want to execute each of them
asynchronously in an Executor and process the results 1 by 1 as they complete.
With
an Executor, you would do so like this:
List<Future<?>>
futures = new ArrayList<Future<?>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
futures.add(executorService.submit(E));
//This
loop must process the tasks in the order they were submitted: A, B, C, D, E
for
(Future<?> future:futures) {
? result = future.get();
// Some processing here
}
The problem with this method is that there is no guarantee that task
A
will complete first. Thus, it is possible that
the main thread will be blocking idly waiting for task A
to complete when it could be processing the
result of another task (say task B
).
Result processing latency could be reduced by using an ExecutorCompletionService
.List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
futures.add(executorCompletionService.submit(E));
//This for loop will process the tasks in the order they are completed, regardless of submission order
for (int i=0; i<futures.size(); i++) {
? result = executorCompletionService.take().get();
// Some processing here
}
ExecutorCompletionService
could be used to
squeeze out a little more efficiency when the order of processing task results
does not matter.
One important thing to note though. The implementation of
ExecutorCompletionService contains a queue of results. If
take
or poll
are not called to drain that queue, a memory leak will occur. Some people use
the Future
returned by submit
to process results and this is NOT correct usage.
The other explanation of the difference from
javadoc:
I think the javadoc best answers the
question of when the
CompletionService
is useful in a way an ExecutorService
isn't.
A
service that decouples the production of new asynchronous tasks from the
consumption of the results of completed tasks.
Basically, this interface allows a program
to have producers which create and submit tasks (and even examine the results
of those submissions) without knowing about any other consumers of the results
of those tasks. Meanwhile, consumers which are aware of the
CompletionService
could
poll
for or
take
results without being aware of the producers submitting the tasks.
Another good example found in javadoc below:
A CompletionService
that uses a supplied Executor
to execute tasks. This class arranges that submitted tasks are, upon
completion, placed on a queue accessible using take. The class is lightweight enough to
be suitable for transient use when processing groups of tasks.
Usage Examples. Suppose you have a set of solvers for a certain problem,
each returning a value of some type Result, and would like to run them
concurrently, processing the results of each of them that return a non-null
value, in some method use(Result
r). You could write this as:
void solve(Executor e, Collection<Callable<Result>>
solvers)
throws InterruptedException,
ExecutionException {
CompletionService<Result> ecs =
new ExecutorCompletionService<Result>(e);
for (Callable<Result> s :
solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
Suppose instead that you would like
to use the first non-null result of the set of tasks, ignoring any that
encounter exceptions, and cancelling all other tasks when the first one is
ready:
void solve(Executor e,
Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs =
new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>>
futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s :
solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r =
ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch(ExecutionException
ignore) {}
}
}
finally {
for (Future<Result> f :
futures)
f.cancel(true);
}
if (result != null)
use(result);
}
shutdown()/shoutdownNow()
When you are done
using the ExecutorService you should shut it
down, so the threads do not keep running.
For instance, if
your application is started via a main() method and your main thread exits your
application, the application will keep running if you have an active ExexutorService in your
application. The active threads inside this ExecutorService prevents the JVM from shutting down.
To terminate the
threads inside the ExecutorService you call its shutdown() method. The ExecutorService will not shut down
immediately, but it will no longer accept new tasks, and once all threads have
finished current tasks, the ExecutorService shuts down. All tasks submitted to
the ExecutorService before shutdown() is called, are
executed.
If you want to shut
down the ExecutorService immediately, you
can call the shutdownNow() method. This will
attempt to stop all executing tasks right away, and skips all submitted but
non-processed tasks. There are no guarantees given about the executing tasks.
Perhaps they stop, perhaps the execute until the end. It is a best effort
attempt.
References:
- http://stackoverflow.com/questions/7758020/difference-between-executor-and-executorcompletionservice-in-java
- http://docs.oracle.com/javase/1.5.0/docs/api/index.html?java/util/concurrent/CompletionService.html
- http://tutorials.jenkov.com/java-util-concurrent/index.html
No comments:
Post a Comment