3.10 Spezielle thread-sichere Datenstrukturen
Ein großer Unterschied zwischen den klassischen Datenstrukturen wie Vector oder Hashtable und denen aus der Collection-API besteht darin, dass alle Methoden durch synchronisierte Blöcke vor parallelen Änderungen geschützt waren. Bei den neuen Klassen wie ArrayList und HashMap sind Einfüge- und Löschoperationen nicht mehr automatisch synchronized. Sollen allerdings Listen, Mengen oder Assoziativspeicher vor nebenläufigen Änderungen sicher sein, gibt es zwei Möglichkeiten: einmal über spezielle sogenannte Wait-free- bzw. Lock-free-Algorithmen, die tatsächlich parallele Zugriffe – wenn möglich – ohne Lock erlauben, und einmal über synchronisierende Wrapper.
3.10.1 Wait-free-Algorithmen
Wenn zum Beispiel bei einer verketteten Liste ein Thread vorn etwas anhängt und der andere hinten etwas entfernt, ist das tatsächlich nebenläufig möglich, und es muss nicht die ganze Datenstruktur gelockt werden. Auch bei anderen Datenstrukturen kann direkt ohne Lock eine Operation durchgeführt werden, und es muss nur im Spezialfall eine besondere Absicherung vorgenommen werden.
3.10.2 Nebenläufiger Assoziativspeicher und die Schnittstelle ConcurrentMap
Die Schnittstelle java.util.concurrent.ConcurrentMap erweitert die Schnittstelle java.util.Map um vier Operationen, die atomar ausgeführt werden müssen:
- V putIfAbsent(K key, V value)
- boolean remove(Object key, Object value)
- V replace(K key, V value)
- boolean replace(K key, V oldValue, V newValue)
Die Implementierungen der Schnittstelle sind:
- ConcurrentHashMap: Ein Assoziativspeicher nach dem Hashing-Verfahren. Die Klasse implementiert ConcurrentNavigableMap (die wiederum NavigableMap erweitert), eine Schnittstelle, die Methoden zur Teilmengenbildung vorgibt.
- ConcurrentSkipListMap: Ein Assoziativspeicher, der automatisch sortiert ist, also vergleichbar mit TreeMap.
Beide sind sehr schnelle Datenstrukturen für gleichzeitig operierende Threads, die dann auch keine ConcurrentModificationException auslösen, wenn es parallele Veränderungen über einen Iterator gibt.
3.10.3 ConcurrentLinkedQueue
Obwohl es keine Schnittstellen ConcurrentSet und ConcurrentList gibt, existiert zumindest eine Klasse ConcurrentLinkedQueue, die eine thread-sichere und wartefreie Liste (genauer: Queue) ist. Wie der Name schon andeutet, beruht die Realisierung auf verketteten Listen und nicht auf Arrays. Ein eigenes ConcurrentSet könnte auf der Basis von ConcurrentHashMap implementiert werden, so wie auch HashSet intern mit einer HashMap realisiert ist. In Java 6 ist die Klasse ConcurrentSkipListSet hinzugekommen.
3.10.4 CopyOnWriteArrayList und CopyOnWriteArraySet
Ist die Anzahl der Leseoperationen hoch, kann es sich lohnen, bei jedem Schreibzugriff erst die Daten zu kopieren und dann das Element hinzuzufügen, damit im Hintergrund andere Threads ohne einen Lock, der für das Schreiben nötig ist, Daten lesen können. Zwei dieser Datenstrukturen bietet Java an: CopyOnWriteArrayList für Listen und CopyOnWriteArraySet für Mengen. Die Klassen sind genau dann optimal, wenn wenig verändert – das ist teuer – und fast ausschließlich gelesen wird. Auch führen die Klassen zu keiner ConcurrentModificationException beim Ändern und gleichzeitigen Ablauf über Iteratoren. Denn falls die CopyOnWriteXXX-Datenstruktur verändert wird, arbeiten die »alten« Interessenten ja mit der herkömmlichen Version. Wenn zum Beispiel ein Iterator durch die Daten läuft und es dann eine Änderung gibt, so führt die Änderung zu einer Kopie der Daten und zur anschließenden Veränderung. Kommt eine Anfrage an die Datenstruktur nach der Veränderung, so bekommt der Anfrager Daten aus der neuen veränderten Datenstruktur. Kommt eine Anfrage während der Veränderung, also zu dem Zeitpunkt, an dem die Veränderung noch nicht abgeschlossen ist, so ist weiterhin die alte Version die einzig korrekte, und Daten kommen aus dieser CopyOnWriteXXX-Datenstruktur.
3.10.5 Wrapper zur Synchronisation
Können zur Absicherung nebenläufiger Operationen die oben aufgeführten Datenstrukturen aus java.util.concurrent nicht verwendet werden, etwa bei Java 1.4 oder bei eigenen nicht-nebenläufig implementierten Versionen von Set, Map, List und Queue, lassen sich Zugriffe auf die Datenstrukturen extern synchronisieren. Dazu fordern statische Methoden wie synchronizedXXX() einen Wrapper an, der sich um die existierende Datenstruktur legt. Die Wrapper arbeiten mit einem Lock, und Parallelität in den Datenstrukturen ist nicht gegeben.
Beispiel |
Eine synchronisierte Liste: List<Byte> syncList = Collections.synchronizedList( new ArrayList<Byte>() ); Der generische Typ der Datenstruktur geht auch weiter an den Wrapper. |
Die statischen synchronizedXXX()-Methoden liefern eine neue Sammlung, die sich wie eine Hülle um die existierende Datenstruktur legt und alle Methodenaufrufe synchronisiert. Paralleler Zugriff darf natürlich dann nur noch über den Wrapper laufen, wobei nicht-paralleler Zugriff weiterhin über die Original-Datenstruktur möglich ist.
class java.util.Collections |
- static <T> Collection<T> synchronizedCollection(Collection<T> c)
- static <T> List<T> synchronizedList(List<T> list)
- static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
- static <T> Set<T> synchronizedSet(Set<T> s)
- static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m)
- static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s)
Liefert synchronisierte, also thread-sichere Datenstrukturen.
Iteratoren von synchronisierten Wrappern
Mit dem Wrapper ist der nebenläufige Zugriff über die Methoden gesichert, aber nicht der Zugriff über den Iterator. Ist syncList eine synchronisierte Datenstruktur, die ein Iterator ablaufen möchte, und soll während des Ablaufens jeder andere Zugriff gesperrt sein, so ist Folgendes zu schreiben:
List<Byte> syncList = Collections.synchronizedList( new ArrayList<Byte>() );
synchronized ( syncList )
{
Iterator iter = syncList.iterator();
}
Die Synchronisation ist immer auf dem Wrapper und nicht auf dem »Gewrappten«.
3.10.6 Blockierende Warteschlangen
Die Schnittstelle BlockingQueue steht für besondere Queues, die blockieren können. Das kann aus zwei Gründen geschehen: Entweder sind keine Daten zu entnehmen, da die Queue leer ist, oder eine maximale Anzahl von zu haltenden Elementen ist erreicht. Besonders in Produzenten/Konsumenten-Szenarien sind blockierende Warteschlangen sehr nützlich.
Eine performante thread-sichere Implementierung ist in der Praxis sehr wichtig, und die Java SE-Bibliothek hat zwei besondere Realisierungen auf Lager:
- ArrayBlockingQueue: Queue immer mit einer maximalen Kapazität, intern realisiert mit einem Feld
- LinkedBlockingQueue: Queue unbeschränkt oder mit maximaler Kapazität, intern realisiert durch eine verkettete Liste
- PriorityBlockingQueue: eine blockierende PriorityQueue
Die anderen Realisierungen wie DelayQueue sind für uns jetzt nicht relevant.
Das Schöne an blockierenden Warteschlangen ist ihr Verhalten in Produzenten/Konsumenten-Verhältnissen; ein Thread (oder beliebig viele Threads) setzt (viele setzen) Daten in die Queue, ein Thread (oder beliebig viele Threads) holt (viele Threads holen) die Daten wieder raus. Bei einer Queue ist es ja so, dass sie nach dem FIFO-Verfahren arbeitet, das heißt, die Daten, die zuerst hineingelegt wurden, werden auch zuerst verarbeitet. Bei einer Prioritätswarteschlange ist das etwas anders, aber dazu gleich mehr.
3.10.7 ArrayBlockingQueue und LinkedBlockingQueue
Bei den normalen blockierenden Datenstrukturen geht es darum, sich zwischen ArrayBlockingQueue und LinkedBlockingQueue zu entscheiden. Der wesentliche Unterschied ist – bis auf die interne Realisierung – dass die ArrayBlockingQueue immer mit einer maximalen Schranke von Elementen konfiguriert werden muss; ist diese Schranke erreicht, blockiert die Queue und nimmt keine neuen Elemente mehr an. Die LinkedBlockingQueue kann unbegrenzt wachsen (also bis Integer.MAX_VALUE). Ist die Queue dann offen, wird nur geblockt, wenn kein Element vorhanden ist, ein Thread aber eines entnehmen möchte.
Beispiel mit unbeschränkter LinkedBlockingQueue
Dazu ein einfaches Beispiel. Wir haben zwei Produzenten für Meldungen und einen Konsumenten, der sie nach Eingang einfach auf der Konsole ausgibt.
Listing 3.36: com/tutego/insel/thread/concurrent/LoggingInQueue.java, LoggingInQueue
public class LoggingInQueue
{
private static final BlockingQueue<String> messages =
new LinkedBlockingQueue<String>();
private static class MessageOutputter extends Thread
{
@Override public void run()
{
while ( true )
try
{
long startTime = System.currentTimeMillis();
System.out.printf( "%s (Wartete %d ms)%n",
messages.take(),
System.currentTimeMillis() – startTime ); }
catch ( InterruptedException e ) { }
}
}
private static class UserMessageProducer extends Thread
{
@Override public void run()
{
for( int i = 0; ; i++ )
messages.add( "msg " + i + " " +
JOptionPane.showInputDialog( "Meldung eingeben" ) );
}
}
private static class DiskspaceMessageProducer extends Thread
{
@Override public void run()
{
for( int i = 0; ; i++ )
{
String dir = System.getProperty( "user.dir" );
messages.add( "spc " + i + " " + new File( dir ).getFreeSpace() );
try { TimeUnit.SECONDS.sleep( 1 ); }
catch ( InterruptedException e ) { }
}
}
}
public static void main( String[] args )
{
new MessageOutputter().start();
new UserMessageProducer().start();
new DiskspaceMessageProducer().start();
}
}
Auf der einen Seite steht der Thread, der die blockiere Queue abhorcht. Wir fragen die Daten mit take() ab, damit die Methode – und somit der Thread – blockiert, falls keine Daten in der Queue sind. Die BlockingQueue-Schnittstelle definiert auch noch andere Methoden wie poll() oder peek(), aber die blockieren nicht, sondern liefern null, wenn keine Daten in der Queue sind – eigentlich sind die Methoden unpassend in einer blockierenden Datenstruktur, aber BlockingQueue erbt diese Methoden von Queue. (Leser sollten zum Test take() durch poll() ersetzen und das Ergebnis testen.) Um eine Vorstellung zu bekommen, wie lange take() warten muss, holen wir vor dem Aufruf von take() über System.currentTimeMillis() die Systemzeit in Millisekunden relativ zum 1.1.1970 sowie nach dem Aufruf von take() – die Differenz der Zeiten gibt uns eine Idee von der Dauer, während der der Thread wartet und auch keine Rechenzeit verbraucht.
Die Produzenten sind in unserem Fall zwei Threads. Der eine holt permanent die Anzahl freier Bytes auf der Festplatte des Benutzers, der andere Thread offeriert einen Benutzerdialog, und die Benutzereingabe kommt auf den Schirm. Gestartet mit ein paar Eingaben ist die Ausgabe dann auch recht unspektakulär:
lstspc 0 55943192576 (Wartete 11 ms)
spc 1 55942631424 (Wartete 943 ms)
spc 2 55943192576 (Wartete 1005 ms)
spc 3 55943192576 (Wartete 1007 ms)
msg 0 eingabe (Wartete 716 ms)
spc 4 55943192576 (Wartete 290 ms)
msg 1 eingabe (Wartete 822 ms)
spc 5 55943192576 (Wartete 180 ms)
msg 2 eingabe (Wartete 112 ms)
3.10.8 PriorityBlockingQueue
Eine blockierende Prioritätswarteschlange ist eine besondere Prioritätswarteschlange, die thread-sicher ist und den Entnehmer-Thread blockiert, wenn kein Element vorhanden ist, also die Queue leer ist. Für blockierende Prioritätswarteschlangen bietet Java eine Klasse: PriorityBlockingQueue; sie implementiert PriorityQueue. Die Sortierung ist entweder natürlich oder über einen externen Comparator geregelt, bei Letzterem muss das Vergleichsobjekt im Konstruktor übergeben werden.
Tickets mit Priorität
Unser nächstes Beispiel legt Tickets in eine blockierende Prioritätswarteschlange. Tickets mit der höchsten Priorität sollen nach vorne wandern. Gleichzeitig messen wir die Zeit beim Anlegen des Tickets, um bei Tickets mit gleicher Priorität dem Ticket den Vorzug zu geben, das als erstes angelegt wurde.
Listing 3.37: com/tutego/insel/thread/concurrent/Ticket.java
package com.tutego.insel.util.concurrent;
import java.util.Date;
class Ticket implements Comparable<Ticket>
{
static enum Priority { SEVERE, NORMAL }
private final Priority priority;
private final String message;
private final Date arrivalDate;
Ticket( Priority priority, String message )
{
this.priority = priority;
this.message = message;
arrivalDate = new Date();
}
@Override public int compareTo( Ticket that )
{
int ticketPriority = this.priority.compareTo( that.priority );
// Wenn Ticket-Priorität ungleich 0, dann ist ein Ticket wichtiger als das
// andere und die Zeit spielt keine Rolle.
if ( ticketPriority != 0 )
return ticketPriority;
// Wenn Ticket-Priorität gleich 0, dann sind beide Tickets
// gleich wichtig. Die Zeit kommt dann als Kriterium hinzu.
return this.arrivalDate.compareTo( that.arrivalDate );
}
@Override public String toString()
{
return String.format( "%tT.%1$TL (%s) kam '%s'",
arrivalDate, priority, message );
}
}
Die Ticket-Klasse implementiert Comparable, sodass es eine natürliche Ordnung der Elemente gibt.
Ein Test, der die Ordnung bei der Sortierung zeigt, ist schnell geschrieben:
Listing 3.38: com/tutego/insel/thread/concurrent/TicketDemo.java, main()
List<Ticket> tickets = Arrays.asList(
new Ticket( Priority.NORMAL, "Kein Senf" ),
new Ticket( Priority.SEVERE, "Feuer" ),
new Ticket( Priority.NORMAL, "Bier warm" ),
new Ticket( Priority.SEVERE, "Erdbeben" ) );
Collections.sort( tickets );
System.out.println( tickets );
Die Ausgabe ist wie:
13:45:20.777 (SEVERE) kam 'Feuer'
13:45:20.777 (SEVERE) kam 'Erdbeben'
13:45:20.777 (NORMAL) kam 'Kein Senf'
13:45:20.777 (NORMAL) kam 'Bier warm'
Die Ausgabe macht zwei Dinge deutlich: Zunächst, dass die wichtigen Meldungen vorne liegen, und dann als zweites, dass zuerst eingefügte Meldungen auch zeitlich vor den nachfolgenden Meldungen liegen (wobei das Programm so schnell ist, dass die Zeit gar nicht als Komponente berücksichtigt werden kann).
Tickets generieren und mit der blockierenden Warteschlange verarbeiten
Im letzten Schritt können wir zwei Threads starten, wobei ein Thread neue Tickets (mit zufälliger Wichtigkeit) in die Queue legt und ein anderer Thread die Nachrichten permament entnimmt.
Listing 3.39: com/tutego/insel/thread/concurrent/PrioritizedTicketsQueue.java
package com.tutego.insel.util.concurrent;
import java.util.concurrent.*;
import com.tutego.insel.util.concurrent.Ticket.Priority;
public class PrioritizedTicketsQueue
{
private static final BlockingQueue<Ticket> tickets =
new PriorityBlockingQueue<Ticket>();
private static class TicketProducer extends Thread
{
@Override public void run()
{
while ( true )
{
Ticket ticket = new Ticket(
Priority.values()[ (int)(Math.random() * 2) ], "Hilfe!" );
tickets.add( ticket );
System.out.println( "Neues Ticket: " + ticket );
try { TimeUnit.MILLISECONDS.sleep( (int)(Math.random() * 2000) ); }
catch ( InterruptedException e ) { /* Empty */ }
}
}
}
private static class TicketSolver extends Thread
{
@Override public void run()
{
while ( true )
{
try
{
System.out.println( tickets.take() );
TimeUnit.SECONDS.sleep( 1 );
}
catch ( InterruptedException e ) { /* Empty */ }
}
}
}
public static void main( String[] args )
{
new TicketProducer().start();
new TicketSolver().start();
}
}
Lassen wir das Programm starten, kann die Ausgabe wie folgt sein:
Neues Ticket: 14:28:55.422 (SEVERE) kam 'Hilfe!'
14:28:55.422 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:28:56.038 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:56.445 (NORMAL) kam 'Hilfe!'
14:28:56.038 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.213 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.264 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:57.443 (SEVERE) kam 'Hilfe!'
14:28:57.443 (SEVERE) kam 'Hilfe!'
14:28:56.445 (NORMAL) kam 'Hilfe!'
Neues Ticket: 14:28:58.618 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:28:59.025 (SEVERE) kam 'Hilfe!'
14:28:58.618 (SEVERE) kam 'Hilfe!'
Neues Ticket: 14:29:00.418 (SEVERE) kam 'Hilfe!'
14:28:59.025 (SEVERE) kam 'Hilfe!'
Interessant ist die fett gedruckte Zeile, denn sie zeigt deutlich, dass – obwohl zwei Tickets vorher generiert wurden – diese durch die spätere Meldung verdrängt werden, da die spätere Meldung eine höhere Wichtigkeit hat als die beiden Meldungen zuvor.
Ihr Kommentar
Wie hat Ihnen das <openbook> gefallen? Wir freuen uns immer über Ihre freundlichen und kritischen Rückmeldungen.