Utility Classes For Synchronization
A semaphore has a counter that allows the access to a shared
resource, it is similar to the Lock interface but when a thread wants
to acquire a semaphore it checks its counter and if it is greater
than zero, then the thread acquires the semaphore and it decrements
the counter. But if the counter is zero the thread waits until the
counter is incremented.
Example:
public class Resource {
Semaphore semaphore = new Semaphore(2);//Indicates two threads can access the resource at the same time.
public void lock() {
semaphore.acquire();//If the counter is zero the thread sleeps otherwise it decrements it and gets the access
System.out.println(“Begining thread ” + Thread.currentThread().getName() + “has the lock”);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“Ending thread ” + Thread.currentThread().getName() + “has the lock”);
semaphore.release();//Release the semaphore and increments the counter
}
}
.......
In the example the acquire() method obtains the semaphore if
the counter is greater than zero, otherwise it waits until it increments
and decrements the counter, then the thread that obtains the
semaphore execute the resource and finally it executes the release()
method and increments the counter.
If the semaphore initializes its counter with a value of one then it
is called a binary semaphore and it behaves just like the
java.util.concurrent.locks.Lock interface.
CountDownLatch
The CountDownLatch can be used to make one thread wait until N
threads have completed some action, or some action has been completed
N times. It has a count and it is initialized with an integer value,
this count is the number actions to wait for.
The CountDownLatch has the method countdown() method which
decreases the internal count, and the await() method blocks or
puts to sleep a thread until the count reaches zero.
Example:
public class Test implements Runnable {
//CountDownLatch controls the answer of 10 questions
private final CountDownLatch controller = new CountDownLatch(10);
public void setQuestionAnswered(){
controller.countDown();// Decrements the counter
}
public void run() {
try {
controller.await();// Wait for all the question to be answered
// Starts the test evaluation
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
In the example the task Test will wait to evaluate until all the
questions of the test are answered.
Other tasks will call the setQuestionAnswered() which calls the
countDown(), decreasing the counter, since the CountDownLatch is
initialized with a value of 10, the countDown() method has to be
called 10 times in order to proceed after the await() method to
evaluate the test.
CyclicBarrier
The CyclicBarrier allows to a set of threads to all wait for each
other or to reach a common barrier point, it is simlar to the
CountDownLatch class, but it has some diferences. It is also
initialized with an integer value known as parties or the number of
threads involve, but it has an optional parameter which is a Runnable
that will be executed when the threads arrive the common point.
The CyclicBarrier has the await() method which blocks until
all the parties invoke this method.
Example:
public class Test implements Runnable {
private CountDownLatch controller = new CountDownLatch(10);
private CyclicBarrier cyclicBarrier;
public Test(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public void setQuestionAnswered(){
controller.countDown();
}
@Override
public void run() {
try {
controller.await();
// Starts the test evaluation
cyclicBarrier.await();//WAITS FOR ALL THREADS OR PARTIES
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public double getTestResult() {
}
}
..........
final Test [] tests = new Test[10];
CyclicBarrier cyclicBarrier = new CyclicBarrier(10,
new Runnable(){
public void run() {
//Get the media or average of the tests
..........
}
});
for(int i=0; i<tests.length; i++)
tests[i] = new Test(cyclicBarrier);
..........
In the example the same Test class used as in the previous example,
but here the CyclicBarrier will get the average or media result from
all the tests. When all the Test thread finish the exam the
CyclicBarrier will execute the Runnable object that is passed as
parameter and will get the average of the tests.
Phaser
The Phaser class is similar to CyclicBarrier and CountDownLatch but
more flexible, the tasks or threads synchronize in steps or phases.
The parties registered to synchronize on a phaser may vary over
time, it can dynamically change with the methods register(),
bulkRegister().
The arriveAndDeregister() mehtod notifies the phasar that a task has
finished and will not participate in future phases decreasing the
number of parties.
The arriveAndAwaitAdvance() method makes a task wait for all the
participants to finish.
Example:
public class Task implements Runnable {
private Phaser phaser;
public Task(Phaser phaser) {
this. phaser = phaser;
}
@Override
public void run() {
try {
phaser.arriveAndAwaitAdvance();
//Start phase1
phaser.arriveAndAwaitAdvance();
//Start phase2
phaser.arriveAndAwaitAdvance();
//Start phase3
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
..........
final Task [] tasks = new Task[10];
Phaser phaser = new Phaser(10);
for(int i=0; i<tasks .length; i++)
tasks[i] = new Task(phaser);
..........
In the example the phaser is set to 10 parties, when all the tasks
reach the first arriveAndAwaitAdvance() they will start the
phase 1 and then they will have to wait to all threads again before
they start phase 2, and so on.
Exchanger
The Exchanger class synchronizes two threads in a point, when both
get to this point they swap or interchange an object. An Exchanger
may be viewed as a bidirectional form of a SynchronousQueue.
The Exchanger class has the exchange() method which
interchanges data between threads.
The producer consumer example can be implemented with this mechanism.
Example:
public class Consumer extends Thread {
private Exchanger <String> exchanger;
public Consumer(Exchanger exchanger) {
this.exchanger = exchanger;
}
public void run() {
String message = "";
while (!message.equalsIgnoreCase("end")) {
//message = mb.take();
try {
message = exchanger.exchange(message);//Waits and exchanges messages with the producer;
System.out.print(message + " ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Producer extends Thread {
private Exchanger exchanger;
public Producer(Exchanger exchanger) {
this.exchanger = exchanger;
}
public void run() {
String [] messages = {"Hello", "world", "end"};
for (String msg:messages) {
try {
exchanger.exchange(msg);//Waits and exchanges messages with the consumer;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
.......
Exchanger <String> exchanger = new Exchanger<String>();
Consumer consumer = new Consumer(exchanger);
Producer producer = new Producer(exchanger);
consumer.start();
producer.start();
.......
This example is similar to the one in the part 4, but instead of
having a class that acts as a monitor, here the Exchanger does that
job and synchronizes the messages between the threads.
nice one, perfect article
ReplyDeletewhy don't you use SHJS(high light your code)? I'm using it, cool
http://shjs.sourceforge.net
Well, I am new to blogging, but thanks for the tip.
DeleteI've used in the past syntax highlighting tools like http://alexgorbatchev.com/SyntaxHighlighter/ or https://code.google.com/p/google-code-prettify/ but I ended up embedding gists from Github. It's quite easy to manage them and look pretty neat :-)
DeleteIf you want to give it a try, here's a tutorial:
http://developertips.blogspot.ro/2010/07/syntax-highlighting-on-blogger-gist.html
Great, thanks I've definitely will try it in one of my next posts.
DeleteThis comment has been removed by the author.
ReplyDelete