Veiledning til Java Parallel Collectors Library

1. Introduksjon

Parallel-collectors er et lite bibliotek som gir et sett med Java Stream API-samlere som muliggjør parallell behandling - samtidig som de omgår hovedfeilene ved standard Parallel Streams.

2. Maven-avhengigheter

Hvis vi vil begynne å bruke biblioteket, må vi legge til en enkelt oppføring i Maven's pom.xml fil:

 com.pivovarit parallellsamlere 1.1.0 

Eller en enkelt linje i Gradles build-fil:

kompilere 'com.pivovarit: parallelle samlere: 1.1.0'

Den nyeste versjonen finner du på Maven Central.

3. Parallelle strømmer forbehold

Parallelle strømmer var et av høydepunktene til Java 8, men de viste seg å være anvendelige utelukkende for tung CPU-prosessering.

Årsaken til dette var det faktum at Parallelle strømmer ble internt støttet av en delt JVM-bred ForkJoinPool, som ga begrenset parallellitet og ble brukt av alle Parallel Streams som kjører på en enkelt JVM-forekomst.

Tenk deg for eksempel at vi har en liste over ID-er, og vi vil bruke dem til å hente en liste over brukere, og at denne operasjonen er dyr.

Vi kan bruke Parallel Streams for det:

Liste-ID = Arrays.asList (1, 2, 3); Listeresultater = ids.parallelStream () .map (i -> fetchById (i)) // hver operasjon tar ett sekund .collect (Collectors.toList ()); System.out.println (resultater); // [bruker-1, bruker-2, bruker-3]

Og faktisk kan vi se at det er en merkbar hastighet. Men det blir problematisk hvis vi begynner å kjøre flere parallelle blokkeringsoperasjoner ... parallelt. Dette kan raskt mette bassenget og føre til potensielt store ventetider. Derfor er det viktig å bygge skott ved å lage separate trådbassenger - for å forhindre at urelaterte oppgaver påvirker hverandres utførelse.

For å gi en tilpasset ForkJoinPool For eksempel kunne vi utnytte trikset som er beskrevet her, men denne tilnærmingen var avhengig av et papirløst hack og var feil frem til JDK10. Vi kan lese mer i selve utgaven - [JDK8190974].

4. Parallelle samlere i aksjon

Parallelle samlere, som navnet antyder, er bare standard Stream API Collectors som tillater å utføre flere operasjoner parallelt kl samle inn() fase.

ParallelCollectors (som speiler Samlere klasse) klasse er en fasade som gir tilgang til hele bibliotekets funksjonalitet.

Hvis vi ønsket å gjøre om eksemplet ovenfor, kan vi bare skrive:

ExecutorService executor = Executors.newFixedThreadPool (10); Liste-ID = Arrays.asList (1, 2, 3); Fullførbar fremtid results = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), executor, 4)); System.out.println (results.join ()); // [bruker-1, bruker-2, bruker-3]

Resultatet er det samme, men vi var i stand til å tilby vårt tilpassede trådområde, spesifisere vårt tilpassede parallellitetsnivå, og resultatet kom pakket inn i en Fullførbar fremtid forekomst uten å blokkere den gjeldende tråden.

Standard Parallel Streams kunne derimot ikke oppnå noen av disse.

4.1. ParallelCollectors.parallelToList / ToSet ()

Så intuitivt som det blir, hvis vi ønsker å behandle en Strøm parallelt og samle resultater i en Liste eller Sett, kan vi ganske enkelt bruke ParallelCollectors.parallelToList eller parallelToSet:

Liste-ID = Arrays.asList (1, 2, 3); Listeresultater = ids.stream () .collect (parallelToList (i -> fetchById (i), executor, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Hvis vi vil samle inn Strøm elementer til en Kart For eksempel, akkurat som med Stream API, må vi tilby to kartleggere:

Liste-ID = Arrays.asList (1, 2, 3); Kartresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), executor, 4)) .join (); // {1 = bruker-1, 2 = bruker-2, 3 = bruker-3}

Vi kan også tilby en tilpasset Kart forekomst Leverandør:

Kartresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, executor, 4)) .join (); 

Og en tilpasset konfliktløsningsstrategi:

Liste-ID = Arrays.asList (1, 2, 3); Kartresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, executor, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

På samme måte som ovenfor, kan vi passere vår skikk Samlingsleverandør hvis vi ønsker å oppnå resultater pakket i vår tilpassede container:

Listeresultater = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, executor, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Hvis ovennevnte ikke er nok, kan vi faktisk få en Strøm forekomst og fortsett tilpasset behandling der:

Kart results = ids.stream () .collect (parallelToStream (i -> fetchById (i), executor, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .bli med();

4.5. ParallelCollectors.parallel ()

Denne lar oss streame resultatene i fullført rekkefølge:

ids.stream () .collect (parallell (i -> fetchByIdWithRandomDelay (i), executor, 4)) .forEach (System.out :: println); // bruker-1 // bruker-3 // bruker-2 

I dette tilfellet kan vi forvente at samleren returnerer forskjellige resultater hver gang siden vi introduserte en tilfeldig behandlingsforsinkelse.

4.6. ParallelCollectors.parallelOrdered ()

Dette anlegget tillater strømmingsresultater akkurat som ovenfor, men opprettholder original ordre:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), executor, 4)) .forEach (System.out :: println); // bruker-1 // bruker-2 // bruker-3 

I dette tilfellet vil samleren alltid opprettholde ordren, men kan være tregere enn ovennevnte.

5. Begrensninger

I skrivende stund parallelle samlere fungerer ikke med uendelige strømmer selv om kortslutningsoperasjoner brukes - det er en designbegrensning pålagt av API-er fra Stream API. For å si det enkelt, Strøms behandler samlere som ikke-kortslutningsoperasjoner, slik at strømmen må behandle alle oppstrømselementer før den blir avsluttet.

Den andre begrensningen er at kortslutningsoperasjoner forstyrrer ikke de gjenværende oppgavene etter kortslutning.

6. Konklusjon

Vi så hvordan parallell-samlerbiblioteket lar oss utføre parallell behandling ved å bruke tilpasset Java Stream API Samlere og CompletableFutures å bruke tilpassede trådbassenger, parallellitet og ikke-blokkerende stil av CompletableFutures.

Som alltid er kodebiter tilgjengelig på GitHub.

For ytterligere lesing, se parallellsamlerbiblioteket på GitHub, forfatterens blogg og forfatterens Twitter-konto.