Executor
Framework
When a program that runs many concurrent tasks, all the code related
to the threads has to be implemented, create a thread object per
task, execute the thread, obtain its results, and so on.
This can
bring some problems such as manage not efficiently the resources of
the computer and affect the performance of the application.
For large applications a better approach is needed and the executor
framework can help with this.
The Executor Framework separates the task of thread creation, its
execution and management, it encapsulates functionality and it
improves the performance using a pool of threads.
The way the executor framework works is really simple, it only
requires instances of Runnable or Callable objects and it takes care
of the rest.
The java.util.concurrent.Executors is a utility class for
create pools, factories and services for the executor framework.
Example:
public class Server {
private ThreadPoolExecutor executor;
public Server() {
executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();//Creates the executor object or a thread pool
}
public void executeTask(Task task) {
executor.execute(task);//executes a task
}
public void endServer() {
executor.shutdown();// This method shuts down the executor
}
}
public class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
public void run() {
System.out.println(Thread.currentThread().getName() + ", created on: " + new Date());
try {
TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ", finished on: " + new Date());
}
}
...........
Server server = new Server();
for (int i = 0; i < 10; i++) {
Task task = new Task("Task " + i);
server.executeTask(task);
}
server.endServer();
...........
In the previous example the Executors class created a
java.util.concurrent.ThreadPoolExecutor object, this
class is an implementation of java.util.concurrent.ExecutorService
interface. Although ThreadPoolExecutor can be created directly using
its constructors, it is recommended to use the Executors class.
The ThreadPoolExecutor uses the execute() method to execute a
Runnable or Callable. It has other methods as getPoolSize(),
getCompleteTaskCount() to get the state of the pool.
The ThreadPoolExecutor has to be terminated explicitly by calling
the endServer() method, otherwise it won't end and the program will
never finish.
To avoid to overload the application and provoke a poor performance,
the Executors class has the method newFixedThreadPool(int
nThreads) which creates a fixed-size thread executor.This executor has a maximum number of threads indicated by the
parameter nThreads,and as the java api says “At any point, at most
nThreads threads will be active processing tasks. If additional tasks
are submitted when all threads are active, they will wait in the
queue until a thread is available. If any thread terminates due to a
failure during execution prior to shutdown, a new one will take its
place if needed to execute subsequent tasks”.
Example:
Example:
......
public Server() {
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
}
......
In the example the executor is created with a maximum of 5 threads at
a time, this means if more than 5 tasks are send to execute only 5
will and the remaining will be blocked until there is a free thread
to process them.
Tasks that
return a value
The executor framework can run tasks that return a value, this is
another advantage of using this framework. For this mechanism the
java.util.concurrent.Callable interface is used, instead of
having a run() it offers a call() method, which returns any type of
object that is specified in the generic form:
public interface Callable<V> {
V call() throws Exception;
}
The ExecutorService has the submit() method which accepts
objects of type Callable and executes them, this method returns an
object of the type java.util.concurrent.Future, the Future
interface has methods to obtain the result generated by the Callable
object.
Example:
public class MultiplyCalculator implements Callable<Integer> {
private int operator1;
private int operator2;
public MultiplyCalculator(int operator1, int operator2) {
this.operator1 = operator1;
this.operator2 = operator2;
}
public Integer call() throws Exception {
return operator1 * operator2;
}
}
..........
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);//Maximum 2 threads at a time
List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
for (int i = 0; i < 10; i++) {
MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));
Future<Integer> result = executor.submit(calculator);
resultList.add(result);
}
while (executor.getCompletedTaskCount() < resultList.size()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}//Waits for the tasks to complete
for (int i = 0; i < resultList.size(); i++) {
Future<Integer> result = resultList.get(i);
try {
System.out.println("The result from the task "+i+ " is:" + result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
..........
In the previous example, the Callable objects that perform a multiply
operation are sent to the executor using the submit() method, the
program waits until all the tasks are finished verifying the
getCompletedTaskCount() method of the executor, once they are done
the results of the operations are obtained with the get() method of
the Future object.
The ExecutorService has the invokeAny(tasks)
method which receives a Collection of tasks then it executes them and
returns the result of the firs task that finishes without
throwing an Exception, tasks that have not completed are cancelled.
An example where this method could be used is for look up services,
an application that wants to look up a database connection in
different servers, the first task that finds the service available is
the one that is going to be used, the other tasks are ignored.
The previous example implemented with the invokeAny() method would
look like this:
.................
ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();
List<MultiplyCalculator> taskList = new ArrayList<MultiplyCalculator>();
for (int i = 0; i < 10; i++) {
MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));
taskList.add(calculator);
}
try {
Integer result = executor.invokeAny(taskList);
System.out.println("The result from the first task in finish is:" + result);
} catch (Exception e) {e.printStackTrace();}
// Shutdown the Executor
executor.shutdown();
.............
The ExecutorService has another mechanism for running multiple tasks
and process the result of all tasks, the invokeAll(tasks)
method receives a Collection of tasks, executes them and returns a
List of Future objects.
Example:
ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();
List<MultiplyCalculator> taskList = new ArrayList<MultiplyCalculator>();
for (int i = 0; i < 10; i++) {
MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));
taskList.add(calculator);
}
List<Future<Integer>> resultList = null;
try {
resultList = executor.invokeAll(taskList);
} catch (Exception e) {e.printStackTrace();}
executor.shutdown();
for (int i = 0; i < resultList.size(); i++) {
Future<Integer> result = resultList.get(i);
try {
System.out.println("The result from the task "+i+ " is:" + result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
In the example instead of sending each task to the executor with the
submit() method, all the tasks are grouped in a list and send them to
execute through the invokeAll() method.
Schedule Tasks
The Executors utility class can create a pool that schedules tasks after a given delay, or executes them periodically. This pool
implements the java.util.concurrent.ScheduledExecutorService
interface.
Example:
ScheduledExecutorService executor=
(ScheduledExecutorService)Executors.newScheduledThreadPool(1);
List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
for (int i=0; i<5; i++) {
MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));
Future future<Integer> = executor.schedule(calculator,i+1 , TimeUnit.SECONDS);
resultList.add(future);
}
executor.shutdown();
// Waits for the finalization of the executor
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
..............
In the example a scheduled pool is created with a pool size of 1,
then each thread is scheduled using the schedule()
method, this method receives as parameters the task to execute, the
period of time to wait before the execution and unit of time.
The executor uses the awaitTermination() method which blocks
until all tasks have completed or a timeout occurs.
Rejected tasks
If a task is send to the executor between the shutdown() and the end
of its execution, the task is rejected. The executor provides a
mechanism to manage this, it just requires an instance of an object
that implements the java.util.concurrent.RejectedExecutionHandler
interface.
Example:
public class RejectedTaskController implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("The task has been rejected");
}
}
........
RejectedTaskController controller = new RejectedTaskController();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(controller)
........
When a task is rejected the rejecedExecution() method of the
RejectedExecutionHandler instance is called.
Go to part 6
Go to part 6
No comments:
Post a Comment