Die invokeAll(…)-Methoden aus dem ExecutorService sind praktisch, wenn es darum geht, mehrere Aufgaben nebenläufig abzusenden, und später die Ergebnisse einzusammeln. Allerdings ist die Rückgabe vom Typ List<Future<T>> und wir werden nicht informiert, wenn ein Ergebnis vorliegt. Wir können zwar die Liste immer wieder ablaufen und jedes Future-Objekte mit isDone() fragen ob es fertig ist, aber das ist keine ideale Lösung.
Mit java.util.concurrent.CompletionService gibt es eine weitere Java-Schnittstelle – die keinen Basistyp erweitert – mit der wir ein Callable oder Runnable arbeiteten lassen können und später nacheinander die Ergebnisse einsammeln können, die fertig sind. Die Java-Bibliothek bringt mit ExecutorCompletionService eine Implementierung der Schnittstelle mit, die intern die fertigen Ergebnisse in einer Queue sammelt, und wir können die Queue abfragen. Schauen wir uns das in einem Beispiel an.
ExecutorService executor = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<>( executor ); List.of( 4, 3, 2, 1 ).forEach( duration -> completionService.submit( () -> { TimeUnit.SECONDS.sleep( duration ); return duration; } ) ); for ( int i = 0; i < 4; i++ ) { try { System.out.println( completionService.take().get() ); } catch ( InterruptedException | ExecutionException e ) { e.printStackTrace(); } } executor.shutdown();
Der Typ ExecutorCompletionService erwartet im Konstruktor einen Executor, der den Code ausführen soll; wir setzen einen Thread-Pool ein. CompletionService hat zwei submit(…)-Methoden:
- Future<V> submit(Runnable task, V result)
- Future<V> submit(Callable<V> task)
Abgesendet werden vier Callable-Exemplare, die 4, 3, 2, 1 Sekunden warten und ihre Wartezeit am Ende zurückgeben. Natürlich wird als erstes das Callable mit der Rückgabe 1 fertig, dann 2, usw.
Für die Rückgaben interessiert sich unser Programm nicht, denn es nutzt die take()-Methode. Insgesamt hat CompletionService drei Entnahme-Methoden:
- Future<V> take()
Liefert das Ergebnis von der ersten abgeschlossenen Aufgabe und entfernt es von der internen Queue. Liegt kein Ergebnis an, wartet die Methode. - Future<V> poll()
Liefert das Ergebnis von der ersten abgeschlossenen Aufgabe und entfernt es von der internen Queue. Liegt kein Ergebnis ist, wartet poll() nicht, sondern liefert null. - Future<V> poll(long timeout, TimeUnit unit)
Wartet wir take() auf ein Ergebnis, doch kommt es nach dem Ablauf von timeout nicht, liefert die Methode wie poll() als Rückgabe null.
Was der Schnittstelle fehlt ist eine Methode, die die verblendende Anzahl liefert. Wir müssen in unserem Code daher einen Zähler als extra Variable einführen.