17.5 Terminale Operationen
Wir haben gesehen, dass sich bei Operationen intermediäre und terminale unterscheiden lassen. Die Schnittstelle Stream bietet insgesamt 18 terminale Operationen, und die Rückgaben der Methoden sind etwa void oder ein Array. Bei den intermediären Operationen sind die Rückgaben allesamt neue Stream-Exemplare. Wir schauen uns nacheinander die terminalen Operationen an.
[»] Hinweis
Wenn der Stream einmal konsumiert wurde, kann er nicht wieder verwendet werden!
17.5.1 Die Anzahl der Elemente
Die vielleicht einfachste terminale Operation ist count(): Sie liefert ein long mit der Anzahl der Elemente im Strom.
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
long count()
[zB] Beispiel
Wie viele Elemente hat ein Array von Referenzen, von null einmal abgesehen?
Object[] array = { null, 1, null, 2, 3 };
long size = Stream.of( array )
.filter( Objects::nonNull ).count();
System.out.println( size ); // 3
17.5.2 Und jetzt alle – forEachXXX(…)
Die Stream-API bietet zwei forEachXXX(…)-Methoden zum Ablaufen der Ergebnisse:
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
Übergeben wird immer ein Codeblock vom Typ Consumer. Die funktionale Schnittstelle Consumer hat einen Parameter, und forEachXXX(…) übergibt beim Ablaufen Element für Element an den Consumer.
Das Ablaufen mit forEach(…) ist ein Implementierungsdetail. Die Reihenfolge ist nicht zwingend deterministisch, was inbesondere bei parallelen Streams auffällt. Anders verhält sich forEachOrdered(…), was die Reihenfolge der Stream-Quelle respektiert, selbst wenn zwischendurch der Stream parallel verarbeitet wird. Es ist aber gut möglich, dass das Stream-Framework keine interne Parallelisierung nutzt, wenn forEachOrdered(…) die Elemente abfragt.
[zB] Beispiel
Im ersten Fall ist die Ausgabe der Werte ungeordnet (zum Beispiel rfAni), im zweiten Fall geordnet (Afrin).
"Afrin".chars().parallel().forEach( c -> System.out.print( (char) c ) );
System.out.println();
"Afrin".chars().parallel().forEachOrdered( c -> System.out.print( (char) c ) );
17.5.3 Einzelne Elemente aus dem Strom holen
Produziert der Strom mehrere Daten in Reihenfolge, so liefert findFirst() das erste Element im Strom, wohingegen findAny() irgendein Element vom Strom liefern kann. Letztere Methode ist insbesondere bei parallelen Operationen interessant, wobei es völlig offen ist, welches Element das ist. Beide Methoden haben den Vorteil, dass sie abkürzende Operationen sind, das heißt, sie ersparen einem einiges an Arbeit.
Da ein Strom von Elementen leer sein kann, ist die Rückgabe der Methoden immer Optional:
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
Optional<T> findFirst()
Optional<T> findAny()
[zB] Beispiel
Consumer<Character> print = System.out::println;
List<Character> chars = List.of( '1', 'a', '2', 'b', '3', 'c' );
chars.parallelStream().findFirst().ifPresent( print ); // 1
chars.parallelStream().findAny().ifPresent( print ); // b
Es liefert findFirst() immer das erste Element 1, aber findAny() könnte jedes liefern, in der Ausgabe etwa b.
Hängen andere zustandsbehaftete Operationen dazwischen, ist eine Optimierung mit findAny() mitunter hinfällig. So wird stream.sorted().findAny() nicht die Sortierung umgehen können.
17.5.4 Existenz-Tests mit Prädikaten
Ob Elemente eines Stroms eine Bedingung erfüllen, zeigen drei Methoden:
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
boolean anyMatch(Predicate<? super T> predicate)
boolean allMatch(Predicate<? super T> predicate)
boolean noneMatch(Predicate<? super T> predicate)
Die Bedingung wird immer als Predicate formuliert.
[zB] Beispiel
Teste in einem Stream mit zwei Strings, ob entweder alle, irgendein oder kein Element leer ist:
System.out.println( Stream.of("", "").allMatch( String::isEmpty ) ); // true
System.out.println( Stream.of("", "a").anyMatch( String::isEmpty ) ); // true
System.out.println( Stream.of("", "a").noneMatch( String::isEmpty ) ); // false
17.5.5 Einen Strom auf sein kleinstes bzw. größtes Element reduzieren
Ein Strom kann aus beliebig vielen Elementen bestehen, die durch Reduktionsfunktionen auf einen Wert reduziert werden können. Ein bekanntes Beispiel ist die Math.max(a, b)-Methode, die zwei Werte auf das Maximum abbildet. Diese Maximum/Minimum-Methoden gibt es auch im Stream:
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
Optional<T> min(Comparator<? super T> comparator)
Optional<T> max(Comparator<? super T> comparator)
Interessant ist, dass immer ein Comparator übergeben werden muss und die Methoden nie auf die natürliche Ordnung zurückgreifen. Hilfreich ist hier eine Comparator-Utility-Methode, die genau so einen natürlichen Comparator liefert. Die Methoden liefern ein Optional.empty(), wenn der Stream leer ist.
[zB] Beispiel
Was ist die größte Zahl im Stream?
System.out.println( Stream.of( 9, 3, 4, 11 ).max( Comparator.naturalOrder() ).
get() );
Die Bestimmung des Minimums und des Maximums sind nur zwei Beispiele für eine Reduktion. Allgemein kann jedes Paar von Werten auf einen Wert reduziert werden. Das sieht im Prinzip für die Minimum-Funktion wie folgt aus:
Stream (4, 2, 3, 1) | Funktionsanwendung | Resultat (Minimum) |
---|---|---|
4 | – | 4 |
2 | min(4, 2) | 2 |
3 | min(2, 3) | 2 |
1 | min(2, 1) | 1 |
Im ersten Fall wird keine Funktion angewendet und es wird nicht reduziert. Das einzelne Element bleibt dadurch vorhanden.
17.5.6 Einen Strom mit eigenen Funktionen reduzieren
Java greift für Reduktionen auf die funktionalen Schnittstellen BiFunction<T,U,R> bzw. BinaryOperator<T> zurück (BinaryOperator<T> ist durch Vererbung eine BiFunction<T,T,T>).
Der Stream-Typ deklariert drei reduce(…)-Methoden, die mit diesen funktionalen Schnittstellen arbeiten. Wichtig ist, dass diese Reduktionsfunktion assoziativ ist, also f(a, f(b, c)) = f(f(a, b), c) gilt, denn insbesondere bei nebenläufiger Verarbeitung können beliebige Paare gebildet und reduziert werden.
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
Optional<T> reduce(BinaryOperator<T> accumulator)
Reduziert alle Paare von Werten schrittweise mit dem accumulator auf einen Wert. Ist der Stream leer, liefert die Methode Optional.empty(). Gibt es nur ein Element, bildet das die Rückgabe.T reduce(T identity, BinaryOperator<T> accumulator)
Reduziert die Werte, wobei das erste Element mit identity über den accumulator reduziert wird. Ist der Strom leer, bildet identity die Rückgabe.<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U>
combiner)
Die ersten beiden Methoden liefern als Ergebnistyp immer den Elementtyp T des Streams. Mit dieser Methode ist der Ergebnistyp nicht T, sondern U, da der accumulator die Typen (U, T) auf U abbildet. Der combiner ist nur dann nötig, wenn in der parallelen Verarbeitung zwei Ergebnisse zusammengelegt werden, sonst ist der combiner überflüssig. Da er aber nicht null sein darf, haben wir es mit dem unschönen Fall zu tun, irgendetwas übergeben zu müssen, auch wenn das bei der sequenziellen Verarbeitung unnötig ist. Der Combiner kommt erst immer nach der Funktion.
[zB] Beispiel
Was ist das größte Element im Stream von positiven Zahlen?
System.out.println( Stream.of( 9, 3, 4, 11 ).reduce( 0, Math::max ) ); // 11
Anders als die max(…)-Methode von Stream liefert reduce(…) direkt ein Integer und kein Optional, weil bei leerem Stream 0 (die Identität) zurückgegeben wird. Das ist problematisch, denn 0 ist nicht das größte Element eines leeren Streams.
[zB] Beispiel
Reduziere einen Strom von Dimension-Objekten auf die Summe der Flächen:
Dimension[] dims = { new Dimension( 10, 10 ), new Dimension( 100, 100 ) };
BiFunction<Integer, Dimension, Integer> accumulator =
(area, dim) -> area + dim.height * dim.width;
BinaryOperator<Integer> combiner = Integer::sum;
System.out.println(
Arrays.stream( dims ).reduce( 0, accumulator, combiner )
); // 10100
Das Ergebnis einer Reduktion ist immer ein neuer Wert, wobei »Wert« natürlich alles sein kann: eine Zahl, ein String, ein Datum etc.
17.5.7 Ergebnisse in einen Container schreiben, Teil 1: collect(…)
Während die reduce(…)-Methoden durch den Akkumulator immer neue Werte in jedem Verarbeitungsschritt produzieren, können die Stream-Methoden collect(…) etwas anders arbeiten. Vereinfacht gesagt erlauben sie es, Daten eines Streams in einen Container (Datenstruktur oder auch String) zu setzen und diesen Container zurückzugeben.
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
<R> R collect(Supplier<R> resultFactory, BiConsumer<R,? super T> accumulator,
BiConsumer<R,R> combiner)
Sammelt die Elemente in einem Container. Die drei Parameter haben alle unterschiedliche Aufgaben:resultFactory: Der Supplier baut den Container auf. Das ist auch der Rückgabetyp.
Accumulator: Der BiConsumer bekommt zwei Argumente, den Container und das Element, und muss dann das Element dem Container hinzufügen.
combiner: Er ist nur bei der Parallelverarbeitung nötig und ist dafür verantwortlich, zwei Container zusammenzulegen.
[zB] Beispiel
Vier Zahlen eines Streams sollen im LinkedHashSet landen:
LinkedHashSet<Integer> set =
Stream.of( 2, 3, 1, 4 )
.collect( LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll );
System.out.println( set ); // [2, 3, 1, 4]
Zur allgemeinen Initialisierung eines LinkedHashSet ist das natürlich noch etwas zu viel Code. Hier geht es einfacher, mit Collection.addAll(…) die Elemente hinzuzufügen.
Wie bei reduce(…) bekommt die collect(…)-Methode jedes Element, doch verbindet die Methode dieses Element nicht zu einer Kombination, sondern setzt es in Container. Daher sind auch die verwendeten Typen nicht BinaryOperator/BiFunction (bekommen etwas und liefern etwas zurück), sondern BiConsumer (bekommt etwas, liefert aber nichts zurück). Der Zustand sitzt damit im Container und nicht in den Zwischenverarbeitungsschritten.
[zB] Beispiel
Eine eigene Klasse macht diese Zustandshaltung nötig, wenn zum Beispiel bei einem Strom von Ganzzahlen am Ende die Frage steht, wie viele Zahlen positiv, negativ oder null waren. Der Kollektor sieht dann so aus:
class NegZeroPosCollector {
public long neg, zero, pos;
public void accept( int i ) {
if ( i > 0 ) pos++; else if ( i < 0 ) neg++; else zero++;
}
public void combine( NegZeroPosCollector other ) {
neg += other.neg; zero += other.zero; pos += other.pos;
}
}
Bei der Nutzung muss der Kollektor aufgebaut und müssen Akkumulator und Kombinierer angegeben werden:
NegZeroPosCollector col = Stream.of( 1, -2, 4, 0, 4 )
.collect( NegZeroPosCollector::new,
NegZeroPosCollector::accept,
NegZeroPosCollector::combine );
System.out.printf( "-:%d, 0:%d, +:%d", col.neg, col.zero, col.pos ); // -:1, 0:1, +:3
17.5.8 Ergebnisse in einen Container schreiben, Teil 2: Collector und Collectors
Es gibt zwei Varianten der collect(…)-Methode, und die zweite ist:
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
<R> R collect(Collector<? super T,R> collector)
Ein Collector fasst Supplier, Akkumulator und Kombinierer zusammen. Die statische Collector.of(…)-Methode baut einen Collector auf, den wir ebenso an collect(Collector) übergeben können – unser Beispiel von eben ist also mit folgendem identisch:
.collect( Collector.of( LinkedHashSet::new,
LinkedHashSet::add, LinkedHashSet::addAll) );
Der Vorteil von Collector ist also, dass dieser drei Objekte zusammenfasst (genau genommen kommt noch ein viertes, Collector.Characteristics hinzu). Die Schnittstelle deklariert zwei statische of(…)-Methoden und weitere Methoden, die jeweils die Funktionen vom Collector erfragen, also accumulator(), characteristics(), combiner(), finisher() und supplier().
Collector.Characteristics ist ein Aufzählungstyp mit den drei Elementen CONCURRENT, IDENTITY_FINISH und UNORDERED, die Eigenschaften eines Collector beschreiben.
Die Utility-Klasse Collectors
Für Standardfälle muss nicht extra mit Collector.of(…) ein Collector zusammengebaut werden, sondern es gibt eine Utility-Klasse Collectors mit knapp 40 statischen Methoden, die jeweils Collector-Exemplare liefern. Diese gruppieren sich grob wie folgt:
toSet(), toList(), toXXXMap(…) usw. für Sammlungen
toUnmodifiableXXX() für unveränderbare Sammlungen, bei denen Elemente nicht hinzugefügt, gelöscht oder ersetzt werden können (seit Java 10)
groupingByXXX(…) und partitioningBy(…) zum Bilden von Gruppen
einfache Rückgaben wie wie bei averagingLong(…)
Zeichenketten-Collectoren wie im Fall von joining(…), die Strings eines Streams verketten
[zB] Beispiel
Füge Strings in einem Strom zu einer Zeichenkette zusammen:
String s = Stream.of( "192", "0", "0", "1" )
.collect( Collectors.joining(".") );
System.out.println( s ); // 192.0.0.1
Baue ein Set<String> mit zwei Startwerten auf:
Set<String> set = Stream.of( "a", "b" ).collect( Collectors.toSet() );
Baue eine Map<Integer, String> auf, die aus zwei Schlüssel-Wert-Paaren besteht:
Map<Integer,String> map =
Stream.of( "1=one", "2=two" )
.collect( Collectors.toMap( k -> Integer.parseInt(k.split("=")[0]),
v -> v.split("=")[1] ) );
final class java.util.stream.Collectors
static <T> Collector<T,?,List<T>> toList()
static <T> Collector<T,?,Set<T>> toSet()
Die weiteren Methoden sollten Leser in der Javadoc studieren.
[»] Hinweis
Ein Collector ist das einzig vernünftige Mittel, um die Stream-Elemente in einen Container zu transferieren. Von einem Seiteneffekt aus irgendeiner intermediären oder terminalen Operation sollten Entwickler absehen, da sonst erstens der Vorteil der funktionalen Programmierung dahin ist (Operationen haben dort keine Seiteneffekte) und zweitens bei paralleler Verarbeitung bei nichtsynchronisierten Datenstrukturen Chaos ausbrechen würde. Folgendes ist also keine Alternative zum charmanten Collectors.joining("."):
StringBuilder res = new StringBuilder();
Stream.of( "192", "0", "0", "1" )
.forEach( s ->
res.append( res.length() == 0 ? "" : "." ).append( s ) );
System.out.print( res );
Wenn ein Lambda-Ausdruck Schreibzugriffe auf äußere Variablen macht, sollten die Alarmglocken schrillen. Das Gleiche gilt im Übrigen auch für Reduktionen. Zwar kann im Prinzip in einem forEach(…) alles zum Beispiel in einen threadsicheren Atomic-Datentyp geschrieben werden, aber wenn es auch funktional geht, dann richtig mit reduce(…).
17.5.9 Ergebnisse in einen Container schreiben, Teil 3: Gruppierungen
Oftmals lassen sich Objekte dadurch in Gruppen einteilen, dass referenzierte Unterobjekte eine Kategorie bestimmen. Da das furchtbar abstrakt klingt, folgen nun ein paar Beispiele:
Konten haben einen Kontentyp, und gesucht ist eine Aufstellung aller Konten nach Kontotyp. Mit dem Kontotyp sind also n Konten verbunden, die alle den gleichen Kontotyp haben.
Menschen haben einen Beruf, und gesucht ist eine Liste aller Menschen nach Berufen.
Threads können in unterschiedlichen Zuständen (wartend, schlafend, …) sein; gesucht ist eine Assoziation zwischen Zustand und Threads in diesem Zustand.
Um dies mit der Stream-API zu lösen, lässt sich collect(…) mit einem besonderen Collector nutzen, den die groupingByXXX(…)-Methoden von Collectors liefern. Wir wollen uns nur mit der einfachsten Variante beschäftigen.
[zB] Beispiel
Gib alle laufenden Threads aus:
Map<Thread.State, List<Thread>> map =
Thread.getAllStackTraces().keySet().stream()
.collect( Collectors.groupingBy( Thread::getState ) );
System.out.println( map.get( Thread.State.RUNNABLE ) );
Zunächst liefert Thread.getAllStackTraces().keySet() ein Set<Threads> mit allen aktiven Threads. Aus der Menge leiten wir einen Strom ab und nutzen einen Kollektor, der nach Thread-Status gruppiert. Das Ergebnis ist eine Assoziation zwischen Thread.State und einer List<Threads>.
Wenn es Methoden in der Java-API gibt, die Entwicklern Generics nur so um die Ohren hauen, dann sind es die gruppierenden Collector-Exemplare – hier eine Übersicht:
final class java.util.stream.Collectors
static <T,K> Collector<T,?,Map<K,List<T>>> groupingBy(Function<? super T,? extends K> classifier)
static <T,K,A,D> Collector<T,?,Map<K,D>> groupingBy(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream)
static <T,K,D,A,M extends Map<K,D>> Collector<T,?,M> groupingBy(Function<? super T,?
extends K> classifier, Supplier<M> mapFactory, Collector<? super T,A,D> downstream)static <T,K> Collector<T,?,ConcurrentMap<K,List<T>>> groupingByConcurrent(Function<? super T,? extends K> classifier)
static <T,K,A,D> Collector<T,?,ConcurrentMap<K,D>> groupingByConcurrent(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream)
static <T,K,A,D,M extends ConcurrentMap<K,D>> Collector<T,?,M>
groupingByConcurrent(Function<? super T,? extends K> classifier, Supplier<M>
mapFactory, Collector<? super T,A,D> downstream)
Die Funktion, die den Schlüssel für den Assoziativspeicher extrahiert, ist in der API der classifier. Er muss immer angegeben werden. Wir haben uns die einfachste Methode – die erste – an einem Beispiel angeschaut; sie liefert immer eine Map von Listen. Die anderen Methoden können auch eine Map mit anderen Datenstrukturen liefern (bzw. statt Listen einfache akkumulierte Werte). Dafür ist der downstream gedacht. Auch können die konkreten Map-Implementierungen bestimmt werden (etwa TreeMap). Das gibt die mapFactory an – sonst wählt groupingBy(Function) für uns eine Map-Klasse aus.
[zB] Beispiel
Sammle die Namen aller Threads nach Status ein:
Map<Thread.State, List<String>> map =
Thread.getAllStackTraces().keySet().stream()
.collect( Collectors.groupingBy(
Thread::getState,
Collectors.mapping( Thread::getName, Collectors.toList() ) ) );
System.out.println( map.get( Thread.State.RUNNABLE ) );
Baue einen Assoziativspeicher auf, der den Thread-Status nicht mit den Threads selbst verbindet, sondern einfach nur mit der Anzahl an Threads:
Map<Thread.State, Long> map =
Thread.getAllStackTraces().keySet().stream()
.collect( Collectors.groupingBy( Thread::getState,
Collectors.counting() ) );
System.out.println( map.get( Thread.State.RUNNABLE ) ); // 3
Assoziiere jeden Thread-Status mit einem Durchschnittswert der Prioritäten derjenigen Threads mit demselben Status:
Map<Thread.State,Double> map =
Thread.getAllStackTraces().keySet().stream().collect(
Collectors.groupingBy( Thread::getState,
Collectors.averagingInt(Thread::getPriority) ) );
System.out.println( map.get( Thread.State.RUNNABLE ) ); // 7.25
System.out.println( map.get( Thread.State.WAITING ) ); // 8.0
17.5.10 Stream-Elemente in ein Array oder einen Iterator übertragen
Kümmern wir uns abschließend um die letzten drei aggregierenden Methoden. Zwei toArray(…)-Methoden von Stream übertragen die Daten des Stroms in ein Array. iterator() auf dem Stream wiederum liefert einen Iterator mit all den Daten (die Methode stammt aus dem Obertyp BaseStream).
interface java.util.stream.Stream<T>
extends BaseStream<T,Stream<T>>
Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)
Iterator<T> iterator()
Die zwei toArray(…)-Methoden sind nötig, da im ersten Fall unklar ist, was für einen Typ das Array hat – es kommt standardmäßig nur Object[] heraus. Der IntFunction wird die Größe des Streams übergeben, und das Ergebnis ist in der Regel ein Array dieser Größe. Konstruktor-Referenzen kommen hier sehr schön zum Zuge. Primitive Arrays können nie die Rückgabe bilden, hierfür müssen wir spezielle primitive Stream-Klassen nutzen.
[zB] Beispiel
Setze alle Elemente eines Streams in ein String-Array:
String[] strings = Stream.of( "der", "die", "das" ).toArray( String[]::new );