Sunday, June 9, 2013

Java Concurrency Part 7

This is the seventh and final part of the tutorial, and this link goes to the previous post java concurrency part 6

Utility Classes For Synchronization

A semaphore has a counter that allows the access to a shared resource, it is similar to the Lock interface but when a thread wants to acquire a semaphore it checks its counter and if it is greater than zero, then the thread acquires the semaphore and it decrements the counter. But if the counter is zero the thread waits until the counter is incremented.

Example:
 public class Resource {  
      Semaphore semaphore = new Semaphore(2);//Indicates two threads can access the resource at the same time.  
      public void lock() {  
           semaphore.acquire();//If the counter is zero the thread sleeps otherwise it decrements it and gets the access  
           System.out.println(“Begining thread ” + Thread.currentThread().getName() + “has the lock”);  
           try {                           
                Thread.sleep(10000);  
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
           System.out.println(“Ending thread ” + Thread.currentThread().getName() + “has the lock”);  
           semaphore.release();//Release the semaphore and increments the counter  
      }  
 }  
 .......  

In the example the acquire() method obtains the semaphore if the counter is greater than zero, otherwise it waits until it increments and decrements the counter, then the thread that obtains the semaphore execute the resource and finally it executes the release() method and increments the counter.

If the semaphore initializes its counter with a value of one then it is called a binary semaphore and it behaves just like the java.util.concurrent.locks.Lock interface.

CountDownLatch

The CountDownLatch can be used to make one thread wait until N threads have completed some action, or some action has been completed N times. It has a count and it is initialized with an integer value, this count is the number actions to wait for.

The CountDownLatch has the method countdown() method which decreases the internal count, and the await() method blocks or puts to sleep a thread until the count reaches zero.

Example:
 public class Test implements Runnable {  
   //CountDownLatch controls the answer of 10 questions  
   private final CountDownLatch controller = new CountDownLatch(10);  
   public void setQuestionAnswered(){  
     controller.countDown();// Decrements the counter   
   }  

   public void run() {  
      try {  
       controller.await();// Wait for all the question to be answered  
       // Starts the test evaluation  
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
   }    
 }  

In the example the task Test will wait to evaluate until all the questions of the test are answered.
Other tasks will call the setQuestionAnswered() which calls the countDown(), decreasing the counter, since the CountDownLatch is initialized with a value of 10, the countDown() method has to be called 10 times in order to proceed after the await() method to evaluate the test.

CyclicBarrier

The CyclicBarrier allows to a set of threads to all wait for each other or to reach a common barrier point, it is simlar to the CountDownLatch class, but it has some diferences. It is also initialized with an integer value known as parties or the number of threads involve, but it has an optional parameter which is a Runnable that will be executed when the threads arrive the common point.

The CyclicBarrier has the await() method which blocks until all the parties invoke this method.

Example:
 public class Test implements Runnable {    
   private CountDownLatch controller = new CountDownLatch(10);  
   private CyclicBarrier cyclicBarrier;  
   public Test(CyclicBarrier cyclicBarrier) {  
      this.cyclicBarrier = cyclicBarrier;  
   }  
   public void setQuestionAnswered(){  
     controller.countDown();  
   }  
   @Override  
   public void run() {  
      try {  
       controller.await();  
       // Starts the test evaluation  
       cyclicBarrier.await();//WAITS FOR ALL THREADS OR PARTIES  
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
   }    
   public double getTestResult() {  
   }  
 }  
 ..........  
 final Test [] tests = new Test[10];  
 CyclicBarrier cyclicBarrier = new CyclicBarrier(10,  
       new Runnable(){  
           public void run() {  
                //Get the media or average of the tests  
                ..........  
           }                                
      });  
 for(int i=0; i<tests.length; i++)  
      tests[i] = new Test(cyclicBarrier);  
 ..........  

In the example the same Test class used as in the previous example, but here the CyclicBarrier will get the average or media result from all the tests. When all the Test thread finish the exam the CyclicBarrier will execute the Runnable object that is passed as parameter and will get the average of the tests.

Phaser

The Phaser class is similar to CyclicBarrier and CountDownLatch but more flexible, the tasks or threads synchronize in steps or phases. The parties registered to synchronize on a phaser may vary over time, it can dynamically change with the methods register(), bulkRegister().

The arriveAndDeregister() mehtod notifies the phasar that a task has finished and will not participate in future phases decreasing the number of parties.

The arriveAndAwaitAdvance() method makes a task wait for all the participants to finish.

Example:
 public class Task implements Runnable {      
   private Phaser phaser;  
   public Task(Phaser phaser) {  
      this. phaser = phaser;  
  }  
   @Override  
   public void run() {  
      try {  
       phaser.arriveAndAwaitAdvance();  
      //Start phase1       
      phaser.arriveAndAwaitAdvance();  
      //Start phase2  
      phaser.arriveAndAwaitAdvance();  
      //Start phase3            
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
   }    
 }  
 ..........  
 final Task [] tasks = new Task[10];  
 Phaser phaser = new Phaser(10);  
 for(int i=0; i<tasks .length; i++)  
      tasks[i] = new Task(phaser);       
 ..........  

In the example the phaser is set to 10 parties, when all the tasks reach the first arriveAndAwaitAdvance() they will start the phase 1 and then they will have to wait to all threads again before they start phase 2, and so on.

Exchanger

The Exchanger class synchronizes two threads in a point, when both get to this point they swap or interchange an object. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue.

The Exchanger class has the exchange() method which interchanges data between threads.

The producer consumer example can be implemented with this mechanism.

Example:
 public class Consumer extends Thread {  
      private Exchanger <String> exchanger;  
      public Consumer(Exchanger exchanger) {  
           this.exchanger = exchanger;  
      }  
      public void run() {  
           String message = "";  
           while (!message.equalsIgnoreCase("end")) {  
                //message = mb.take();  
                try {  
                     message = exchanger.exchange(message);//Waits and exchanges messages with the producer;  
                     System.out.print(message + " ");  
                } catch (InterruptedException e) {  
                     e.printStackTrace();  
                }  
           }            
      }       
 }  
 public class Producer extends Thread {  
      private Exchanger exchanger;  
      public Producer(Exchanger exchanger) {  
           this.exchanger = exchanger;  
      }  
      public void run() {  
           String [] messages = {"Hello", "world", "end"};  
           for (String msg:messages) {  
                try {  
                     exchanger.exchange(msg);//Waits and exchanges messages with the consumer;                 
                } catch (InterruptedException e) {  
                     e.printStackTrace();  
                }  
           }  
      }            
 }  
 .......  
      Exchanger <String> exchanger = new Exchanger<String>();   
           Consumer consumer = new Consumer(exchanger);  
           Producer producer = new Producer(exchanger);  
           consumer.start();  
           producer.start();  
 .......  


This example is similar to the one in the part 4, but instead of having a class that acts as a monitor, here the Exchanger does that job and synchronizes the messages between the threads.


5 comments:

  1. nice one, perfect article
    why don't you use SHJS(high light your code)? I'm using it, cool
    http://shjs.sourceforge.net

    ReplyDelete
    Replies
    1. Well, I am new to blogging, but thanks for the tip.

      Delete
    2. I've used in the past syntax highlighting tools like http://alexgorbatchev.com/SyntaxHighlighter/ or https://code.google.com/p/google-code-prettify/ but I ended up embedding gists from Github. It's quite easy to manage them and look pretty neat :-)

      If you want to give it a try, here's a tutorial:

      http://developertips.blogspot.ro/2010/07/syntax-highlighting-on-blogger-gist.html

      Delete
    3. Great, thanks I've definitely will try it in one of my next posts.

      Delete
  2. This comment has been removed by the author.

    ReplyDelete