6.7 Kommunikation zwischen Threads mit Pipes *
Die Kommunikation zwischen Programmteilen kann auf vielfältige Weise geschehen. Einige Möglichkeiten haben wir bei Threads kennengelernt. Bei getrennten Programmen lässt sich die Kommunikation über Dateien realisieren. Auch Datenströme können von einem Teil geschrieben und vom anderen gelesen werden. Wenn wir jedoch mit Threads arbeiten, wäre eine Kommunikation über Dateien zwar denkbar, aber zu aufwändig. Ein anderes Stromkonzept ist praktisch.
6.7.1 PipedOutputStream und PipedInputStream
Einfacher ist der Austausch der Daten über eine sogenannte Pipe. Sie wird über Paare spezieller Stromklassen gebildet:
Die PipedXXX-Klassen sind übliche Unterklassen von OutputStream/InputStream und Writer/Reader (im nächsten Beispiel verfolgen wir die Byte-Variante). Wenn dann Threads Daten austauschen wollen, kann ein Produzent sie über write() in den Ausgabestrom schreiben, und der andere Thread wird sie dort über read() empfangen können.
Natürlich muss ein schreibender Pipe-Strom wissen, wer der Empfänger ist. Daher müssen die Schreib/Lese-Pipes miteinander verbunden werden. Eine Möglichkeit bietet connect().
Beispiel |
Ein PipedOutputStream soll mit einem PipedInputStream verbunden werden: PipedOutputStream pos = new PipedOutputStream(); Werden jetzt Daten produziert und in den pos geschrieben, kommen sie über den pis wieder an und können dort konsumiert werden. |
Ob wir nun vom PipedOutputStream die Methode connect(PipedInputStream) nehmen oder vom PipedInputStream die Methode connect(PipedOutputStream), ist dabei egal.
Anstatt nach dem Aufbau der Ströme über den Standard-Konstruktor beide mit connect() zu verbinden, gibt es eine alternative Lösung: Entweder lässt sich nach dem Erzeugen des PipedOutputStream über den Standard-Konstruktor das frische Strom-Objekt in den parametrisierten Konstruktor von PipedInputStream übergeben oder eben umgekehrt ein neues PipedInputStream-Objekt in den parametrisierten Konstruktor von PipedOutputStream legen.
Abbildung 6.13: UML-Diagramm mit dem Beziehungen zwischen den PipedXXX-Klassen
Beispiel |
Verbinde den Eingabe-Stream pis mit dem Ausgabe-Stream pos: PipedInputStream pis = new PipedInputStream(); |
Interna
Der Austausch der Daten geschieht über einen internen Puffer, den PipedInputStream anlegt. Die Daten, die PipedOutputStream über write() schreiben soll, gelangen direkt zum Puffer des Eingabestroms. Werfen wir einen kurzen Blick auf die relevanten Teile der Implementierung:
class PipedOutputStream extends OutputStream
{
private PipedInputStream sink;
public PipedOutputStream( PipedInputStream snk )
throws IOException
{
/* Auskommentierte Fehlerbehandlung */
sink = snk;
snk.in = –1;
snk.out = 0;
snk.connected = true;
}
public void write( int b ) throws IOException
{
if ( sink == null )
throw new IOException( "Pipe not connected" );
sink.receive( b );
}
}
Der PipedInputStream nutzt intern einen Puffer von standardmäßig 1.024 Elementen. Das bedeutet: Der Schreibende kann standardmäßig bis zu 1.024 Byte (oder Zeichen bei PipedReader) produzieren, bis die Kommunikation stoppen muss. Denn mit dieser Größe ist der Puffer voll und der Produzent blockiert; der Lesende muss den Puffer erst leeren, damit der Konsument weiterarbeiten darf. Umgekehrt bedeutet dies, dass der lesende Thread bei ungenügend vielen Zeichen warten muss, bis der Schreiber die nötige Anzahl hinterlegt hat. Dafür wird intern mittels Thread-Synchronisation gearbeitet. Lebt die andere Seite nicht mehr, gibt es eine IOException.
Seit Java 6 lässt sich die Größe über einen Konstruktor wie PipedInputStream(int pipeSize), PipedInputStream(PipedOutputStream src, int pipeSize), PipedReader(int pipeSize) oder PipedReader(PipedWriter src, int pipeSize) setzen.
6.7.2 PipedWriter und PipedReader
Die Klassen PipedWriter und PipedReader sind die char-Varianten für die sonst byte-orientierten Klassen PipedOutputStream und PipedInputStream. Diese sollen uns als Beispiel dienen. Zwei Threads arbeiten miteinander und tauschen Daten aus. Der eine Thread produziert Zufallszahlen, die ein anderer Thread auf dem Bildschirm darstellt:
Listing 6.21: com/tutego/insel/io/stream/PipeDemo.java, PipeRandomWriter
package com.tutego.insel.io.stream;
import java.io.*;
class PipeRandomWriter extends PipedWriter implements Runnable
{
@Override public void run()
{
while ( true ) {
try
{
write( String.format("%f%n", Math.random()) );
Thread.sleep( 200 );
}
catch ( Exception e ) { e.printStackTrace(); }
}
}
}
Die Klasse ist eine Spezialisierung von PipedWriter und produziert in run() endlos Zufallszahlen, die in den Ausgabestrom vom PipedWriter geschoben werden. Der PipeRandomReader wiederum ist ein PipedReader, der über einen BufferedReader alle geschriebenen Zeilen ausliest:
Listing 6.22: com/tutego/insel/io/stream/PipeDemo.java, PipeRandomReader
class PipeRandomReader extends PipedReader implements Runnable
{
@Override public void run()
{
BufferedReader br = new BufferedReader( this );
while ( true )
try
{
System.out.println( br.readLine() );
}
catch ( IOException e ) { e.printStackTrace(); }
}
}
Das Hauptprogramm erzeugt die beiden spezialisierten Pipes und verbindet sie. Danach werden die Threads gestartet:
Listing 6.23: com/tutego/insel/io/stream/PipeDemo.java, PipeDemo
public class PipeDemo
{
public static void main( String[] args ) throws Exception
{
PipeRandomWriter out = new PipeRandomWriter();
PipeRandomReader in = new PipeRandomReader();
in.connect( out );
new Thread( out ).start();
new Thread( in ).start();
}
}
Ihr Kommentar
Wie hat Ihnen das <openbook> gefallen? Wir freuen uns immer über Ihre freundlichen und kritischen Rückmeldungen.