2.6 Datensynchronisation durch besondere Concurrency-Klassen *
Arbeiten mehrere Threads zusammen, so wollen sie in der Regel Daten austauschen und sich an bestimmten Bedingungen synchronisieren. Die Java API bietet für diese Zusammenarbeit eine Reihe von Klassen:
- Semaphore: Erlaubt eine maximale Anzahl Threads in einem Programmblock.
- CyclicBarrier: Eine Menge von Threads wartet aufeinander, um zu einem gemeinsamen Punkt zu kommen.
- CountDownLatch: Ein oder mehrere Threads warten auf eine Bedingung. Ist sie erfüllt, können die Threads fortfahren.
- Exchanger: Zwei Threads treffen sich und tauschen Daten aus.
2.6.1 Semaphor
Ein Semaphor[10](Semaphoren wurden 1968 vom niederländischen Informatiker Edsger Wybe Dijkstra eingeführt, also zehn Jahre vor Hoares Monitoren.) stellt sicher, dass nur eine bestimmte Anzahl von Threads auf ein Programmstück zugreift, und zählt damit zur Technik der Sperrmechanismen. Es lassen sich zwei Typen von Semaphoren unterscheiden:
- Binäre Semaphoren lassen höchstens einen Thread auf ein Programmstück zu. Das bekannte Paar await()/signal() bei Condition beziehungsweise wait()/notify() von Object bietet sich für binäre Semaphoren an.
- Allgemeine Semaphoren erlauben eine begrenzte Anzahl an Threads in einem kritischen Abschnitt. Das Semaphor verwaltet intern eine Menge sogenannter Erlaubnisse (engl. permits).
Die Klasse Semaphore
Für allgemeine Semaphoren mit einer maximalen Anzahl Threads im Programmstück deklariert die Java-Bibliothek die Klasse java.util.concurrent.Semaphore.
Abbildung 2.11: UML-Diagramm für Semaphore
Die wichtigen Eigenschaften der Semaphore-Klasse sind der Konstruktor und die Methoden zum Betreten und Verlassen des kritischen Abschnitts. Intern vermerkt das Semaphor jedes Betreten und lässt Threads warten, wenn das gesetzte Maximum erreicht ist, bis ein anderer Thread das Programmsegment verlässt.
class java.util.concurrent.Semaphore |
- Semaphore(int permits)
Das neue Semaphor, das bestimmt, wie viele Threads in einem Block sein dürfen. - void acquire()
Versucht, in den kritischen Block einzutreten. Wenn der gerade belegt ist, wird gewartet. Vermindert die Menge der Erlaubnisse um eins. - void release()
Verlässt den kritischen Abschnitt und legt eine Erlaubnis zurück.
Allgemeine Semaphoren vereinfachen das Konsumenten-Produzenten-Problem, da eine bestimmte Anzahl von Threads in einem Block erlaubt ist. Die verbleibende Größe des Puffers ist somit automatisch die maximale Anzahl von Produzenten, die sich parallel im Einfügeblock befinden können.
Unser Beispiel soll mit einem Semaphor arbeiten, das nur zwei Threads gleichzeitig in den kritischen Abschnitt lässt:
Listing 2.31: com/tutego/insel/thread/concurrent/SemaphoreDemo.java, Teil 1
package com.tutego.insel.thread.concurrent;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo
{
static Semaphore semaphore = new Semaphore( 2 );
Der kritische Abschnitt besteht aus zwei Operationen: einer Ausgabe auf dem Bildschirm und einer Wartezeit von zwei Sekunden. Er ist in einem Runnable eingebettet:
Listing 2.32: com/tutego/insel/thread/concurrent/SemaphoreDemo.java, Teil 2
static Runnable r = new Runnable() {
@Override public void run() {
while ( true ) {
try
{
semaphore.acquire();
try
{
System.out.println( "Thread=" + Thread.currentThread().getName() +
", Available Permits=" + semaphore.availablePermits() );
TimeUnit.SECONDS.sleep( 2 );
}
finally
{
semaphore.release();
}
}
catch ( InterruptedException e )
{
e.printStackTrace();
Thread.currentThread().interrupt();
break;
}
}
}
};
Der kritische Abschnitt beginnt mit dem acquire() und endet mit release(). Wichtig ist die richtige Ausnahmebehandlung. Fünf Dinge müssen beachtet werden und formen den Quellcode:
- Das acquire() kann eine InterruptedException auslösen, was eine geprüfte Ausnahme ist. Wir müssen sie folglich behandeln.
- Da acquire() sowie sleep() eine InterruptedException auslösen können, ist die Frage, wie damit umzugehen ist. Im Prinzip signalisiert die Ausnahme die Bitte, das Warten zu beenden. Das wollen wir berücksichtigen, und wir brechen daher aus der Endlosschleife aus. Details zu der Ausnahme und zur interrupt()-Methode enthält Abschnitt 2.2.8.
- Immer dann, wenn ein acquire() erfolgreich war, muss auch ein release() folgen. Das release() ist im finally sehr gut aufgehoben, denn wir wollen in jedem Fall die Semaphoren wieder freigeben, auch wenn irgendwie eine andere RuntimeException auftauchen sollte.
- Ein release() darf nur dann erfolgen, wenn es ein zugehöriges acquire() gibt. Eine Programmierung wie try { semaphore.acquire(); ... } finally { semaphore.release(); } ist unsicher, denn wenn ein acquire() wirklich eine Ausnahme erzeugt, wird fälschlicherweise ein release() ausgelöst.
- Ein release() erzeugt keine Ausnahme, daher muss auch nichts behandelt werden.
Drei Threads sollen sich koordinieren:
Listing 2.33: com/tutego/insel/thread/concurrent/SemaphoreDemo.java, Teil 3
public static void main( String[] args )
{
new Thread( r ).start();
new Thread( r ).start();
new Thread( r ).start();
}
}
Nach dem Starten ist gut zu beobachten, wie jeweils zwei Threads im Abschnitt sind (eine Leerzeile symbolisiert die Wartezeit):
Thread=Thread-0, Available Permits=1
Thread=Thread-1, Available Permits=0
Thread=Thread-2, Available Permits=0
Thread=Thread-0, Available Permits=0
Thread=Thread-2, Available Permits=0
Thread=Thread-0, Available Permits=0
Fair und unfair
In der Ausgabe ist zu sehen, dass Thread 0, 1 und 2 zwar ihre Aufgaben ausführen können, aber plötzlich eine Sequenz 0, 2, 0 entsteht. Unser Gerechtigkeitssinn sagt uns jedoch, dass Thread 1 wieder an die Reihe kommen müsste. Wie ist das möglich? Die Antwort lautet, dass das acquire() nicht berücksichtigt, wer am längsten wartet, sondern dass es sich aus der Liste der Wartenden einen beliebigen Thread auswählt (wir kennen das von notify() her und dem Betreten eines synchronized-Blocks). Um ein faires Verhalten zu realisieren, wird die Fairness einfach über den Konstruktor von Semaphore angegeben. Ändern wir im Programm folgende Zeile:
static Semaphore semaphore = new Semaphore( 2, true );
Nun bekommen wir eine Ausgabe wie die folgende:
Thread=Thread-0, Available Permits=1
Thread=Thread-1, Available Permits=0
Thread=Thread-2, Available Permits=0
Thread=Thread-0, Available Permits=0
Thread=Thread-1, Available Permits=0
Thread=Thread-2, Available Permits=0
Thread=Thread-0, Available Permits=0
Thread=Thread-1, Available Permits=0
2.6.2 Barrier und Austausch
Der Punkt, an dem alle Threads zusammenkommen und sich die Ergebnisse zusammenlegen lassen, heißt auf Englisch barrier. Seit Java 5 gibt es die Klasse CyclicBarrier im Paket java.util.concurrent, die eine solche Barriere realisiert. Der Vorteil gegenüber join() besteht in der Tatsache, dass der für die Abarbeitung verwendete Thread nicht enden muss – und ein Thread im Thread-Pool endet eigentlich nicht –, sondern dass er mit await() sein »Bin-fertig«-Signal geben kann. Das folgende Beispiel zeigt anhand eines parallelen Summierers die Funktionsweise:
Listing 2.34: com/tutego/insel/thread/concurrent/ArraySummer.java, ArraySummer
public class ArraySummer
{
public static void main( String[] args )
{
int[] array = new int[ 1000 ];
Random r = new Random();
for ( int i = 0; i < array.length; i++ )
array[ i ] = Math.abs( r.nextInt() / 2 );
parallSummer( array );
}
public static void parallSummer( int[] array )
{
int prozessors = 2; // Runtime.getRuntime().availableProcessors();
final List<Long> longs = new ArrayList<Long>();
Runnable merger = new Runnable() {
@Override public void run()
{
long sum = 0;
for ( long i : longs )
sum += i;
System.out.println( sum );
}
};
CyclicBarrier barrier = new CyclicBarrier( prozessors, merger );
for ( int part = 0; part < prozessors; part++ )
new Thread( new AtomarSummer( barrier, array, prozessors, part,
longs)).start();
}
}
Listing 2.35: com/tutego/insel/thread/concurrent/ArraySummer.java, AtomarSummer
class AtomarSummer implements Runnable
{
private final CyclicBarrier barrier;
private final int[] array;
private final List<Long> longs;
private int start, end;
public AtomarSummer( CyclicBarrier barrier, int[] array, int maxPart,
int currentPart, List<Long> longs )
{
this.barrier = barrier;
this.array = array;
this.longs = longs;
start = (int) ((double) array.length / maxPart * currentPart);
end = (int) ((double) array.length / maxPart * (currentPart + 1) – 1);
}
@Override public void run()
{
long sum = 0;
for ( int i = start; i < end; i++ )
sum += array[ i ];
longs.add( sum );
try
{
barrier.await();
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
catch ( BrokenBarrierException e )
{
e.printStackTrace();
}
}
}
2.6.3 Stop and go mit Exchanger
Die generische Klasse java.util.concurrent.Exchanger dient ebenfalls dem Zusammenkommen von Threads, die jedoch bei ihrem Rendezvous Daten austauschen können. Ein üblicher Fall betrifft das Füllen von Puffern, etwa wenn ein Thread Daten vom Datensystem liest und ein anderer Thread die Daten über das Netzwerk weiterschickt. Ein Dateisystem-Thread füllt den Puffer, und wenn er komplett gefüllt ist, trifft er sich mit einem anderen Netzwerk-Thread, dem er den vollen Puffer gibt und von dem er wieder einen leeren Puffer empfängt. Der Netzwerk-Thread kann dann den Inhalt des Puffers wieder »verbrauchen« und der erste Thread den Puffer wieder mit Daten vom Dateisystem füllen.
Ihr Kommentar
Wie hat Ihnen das <openbook> gefallen? Wir freuen uns immer über Ihre freundlichen und kritischen Rückmeldungen.