Sunday, June 2, 2013

Java Concurrency Part 6

This is the sixth part of the tutorial, and this link goes to the previous post java concurrency part 5

Fork/Join Framework

The fork/join framework is an implementation of the ExecutorService interface that uses a different approach, it solves tasks splitting them into smaller tasks recursively.

The fork/join framework implements the work-stealing algorithm, this algorithm says worker threads that run out of things to do can steal tasks from other threads that are still busy. Put it in another way, it says a thread that is waiting for the finalization of other threads, looks for threads that have not been executed and executes them.

Basically there are two operations, one to divide a task into smaller tasks “fork”, and other to wait for the tasks to finalize “join”.

The java.util.concurrent.ForkJoinPool class implements the ExecutorSevice interface and also implements the work-stealing algorithm.

The java.util.concurrent.ForkJoinTask class implements the Future interface, is an abstract class for the tasks that execute within the ForkJoinPool, provides the fork() method to arrange asynchronous execution and the join() method for proceed until the task's result has been computed.

Example:
 public class IncrementTask extends RecursiveAction {      
   private int [] array;  
   private int first;  
   private int last;  

   public IncrementTask(int [] array, int first, int last) {  
     this.array = array;  
     this.first = first;  
     this.last = last;  
   }  
   protected void compute() {  
     if (last - first < 10) {  
       increment();  
       System.out.println("Completed from "+first+", to: "+last);  
     } else {  
       int middle = (last + first) / 2;  
       System.out.println("Pending tasks: " + getQueuedTaskCount());  
       IncrementTask it1 = new IncrementTask(array, first, middle + 1);  
       IncrementTask it2 = new IncrementTask(array, middle + 1, last);  
       invokeAll(it1, it2); //Waits for the finalization of these tasks  
     }  
   }  
   private void increment() {  
     for (int i = first; i < last; i++) {  
       array[i] = ++array[i];        
     }      
   }  
 }  
 ...............  
     int array [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<array.length; i++) {  
       array[i] = random.nextInt(10000);  
     }      
     System.out.print("Array: {");   
     for (int i=0; i<array.length; i++) {      
       System.out.print(array[i] + ", ");  
     }  
     System.out.print(" }");   
     IncrementTask task = new IncrementTask(array, 0, array.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(task);      
     do {  
       System.out.println("Threads active: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!task.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
       System.out.println("The process has completed normally");  
     }  
     System.out.print("Array incremented: {");   
     for (int i=0; i<array.length; i++) {      
       System.out.print(array[i] + ", ");  
     }  
     System.out.print(" }");   
 ...............  

In the example the IncrementTask class extends the java.util.concurrent.RecursiveAction class, this task implements its logic int the compute() method, it increments the value of the elements of an array of integers. But it only increments 10 elements per task, if the size of the array is bigger than that, it creates two subtasks and executes them through the invokeAll() method of the RecursiveAction class. In this way the tasks are calling recursively.

The RecursiveAction class extends the ForkJoinTask class, it has the invokeAll() method which executes subtasks and waits for its finalization before continuing, while it is waiting for the subtasks to end, the worker thread takes another task and excutes it.

The Fork/Join framework provides another class to return a value from a task, the java.util.concurrent.RecursiveTask this class behaves just like the RecursiveAction class but its compute() method returns a value.

Example:
 public class SumTask extends RecursiveTask <Integer> {  
   private int [] array;  
   private int first;  
   private int last;  
   public SumTask(int[] array, int first, int last) {  
     this.array = array;  
     this.first = first;  
     this.last = last;  
   }  
   protected Integer compute() {  
     int sum = 0;  
     if (last - first < 10) {  
       sum = add();  
       System.out.println("Completed from "+first+", to: "+last);  
     } else {  
       int middle = (last + first) / 2;  
       System.out.println("Pending tasks: " + getQueuedTaskCount());  
       SumTask st1 = new SumTask(array, first, middle + 1);  
       SumTask st2 = new SumTask(array, middle + 1, last);  
       invokeAll(st1, st2);        
       try {  
         sum += st1.get();  
         sum += st2.get();  
       } catch (Exception ex) {  
         ex.printStackTrace();  
       }         
     }      
     return sum;  
   }  
   private Integer add() {  
     int sum = 0;  
     for (int i = first; i < last; i++) {  
       sum += array[i];        
     }      
     return sum;  
   }  
 }  
 ......................  
     int array [] = new int [100];  
     Random random = new Random();  
     for (int i=0; i<array.length; i++) {  
       array[i] = random.nextInt(100);  
     }      
     System.out.print("Array: {");   
     for (int i=0; i<array.length; i++) {      
       System.out.print(array[i] + ", ");  
     }  
     System.out.print(" }");   
     SumTask task = new SumTask(array, 0, array.length);      
     ForkJoinPool pool = new ForkJoinPool();      
     pool.execute(task);      
     do {  
       System.out.println("Threads active: " + pool.getActiveThreadCount());        
       try {  
         TimeUnit.MILLISECONDS.sleep(5);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     } while (!task.isDone());      
     pool.shutdown();  
     if (task.isCompletedNormally()) {  
       System.out.println("The process has completed normally");  
     }  
     try {   
       System.out.print("Array sum is:" + task.get());  
     } catch (Exception ex) {  
       ex.printStackTrace();;  
     }  
 ......................  

In the example the SumTask class extends the RecursiveTask class with Integer as its type, the SumTask will add all the elements of an array. But it only adds 10 elements per task, if the size of the array is bigger than that, then it creates two subtasks and executes them through the invokeAll() method of the RecursiveTask class just as the previous example, but in contrast to the previos example each task returns the sum of the 10 elements, and if subtasks were created then adds the result of the subtasks.
The value from the subtasks is obtained with the get() method from the Future interface.

Until now all the tasks have been executed synchronously, when a task calls the invokeAll() method it waits until the tasks sent to execute through this method finish, with this the work-stealing algorithm is implemented, assigning a new task to the worker when the task is waiting.

Tasks also can be executed asynchronous with the fork() and join() methods of the ForkJoinTask calss, the fork() method executes asynchronously a subtask continuing with the execution of the task, and the join() method waits for the result of the subtask.

Using this mechanism the work-stealing algorithm can not be implemented since the task continues executing when creates a subtask.

Example:
 public class SumTaskAsync extends RecursiveTask <Integer> {  
   private int [] array;  
   private int first;  
   private int last;  
   public SumTaskAsync(int[] array, int first, int last) {  
     this.array = array;  
     this.first = first;  
     this.last = last;  
   }  
   protected Integer compute() {  
     int sum = 0;  
     if (last - first < 10) {  
       sum = add();  
       System.out.println("Completed from "+first+", to: "+last);  
     } else {  
       int middle = (last + first) / 2;  
       System.out.println("Pending tasks: " + getQueuedTaskCount());  
       SumTask st1 = new SumTask(array, first, middle + 1);        
       SumTask st2 = new SumTask(array, middle + 1, last);  
       st1.fork();  
       st2.fork();//Continues with execution        
       sum += st1.join();//Waits for the result  
       sum += st2.join();  
     }  
     return sum;  
   }  
   private Integer add() {  
     int sum = 0;  
     for (int i = first; i < last; i++) {  
       sum += array[i];        
     }      
     return sum;  
   }  
 }  

In the example the SumTaskAsync executes its subtasks asynchronous using the fork() and join() methods.

Go to part 7

No comments:

Post a Comment