Veiledning til Fork / Join Framework i Java

1. Oversikt

Gaffel / sammenføyningsrammeverket ble presentert i Java 7. Det gir verktøy for å øke hastigheten på parallell behandling ved å prøve å bruke alle tilgjengelige prosessorkjerner - noe som oppnås gjennom en splitt og erobre tilnærming.

I praksis betyr dette at rammeverket først “gafler”, rekursivt bryte oppgaven i mindre uavhengige deloppgaver til de er enkle nok til å utføres asynkront.

Etter det, “join” -delen begynner, hvor resultatene av alle deloppgaver blir rekursivt koblet til et enkelt resultat, eller i tilfelle en oppgave som returnerer ugyldig, venter programmet ganske enkelt til hver deloppgave blir utført.

For å gi effektiv parallell kjøring, bruker gaffel / sammenføyningsrammeverket en gruppe tråder kalt ForkJoinPool, som administrerer arbeidertråder av typen ForkJoinWorkerThread.

2. ForkJoinPool

De ForkJoinPool er hjertet i rammeverket. Det er en implementering av ExecutorService som administrerer arbeidstakertråder og gir oss verktøy for å få informasjon om trådgruppens tilstand og ytelse.

Arbeidstråder kan bare utføre en oppgave om gangen, men ForkJoinPool oppretter ikke en egen tråd for hver enkelt deloppgave. I stedet har hver tråd i bassenget sin egen kø med dobbelt ende (eller deque, uttalt Dekk) som lagrer oppgaver.

Denne arkitekturen er viktig for å balansere trådens arbeidsmengde ved hjelp av arbeids stjele algoritme.

2.1. Arbeids stjele algoritme

Enkelt sagt - gratis tråder prøver å “stjele” arbeid fra deques av travle tråder.

Som standard får en arbeidertråd oppgaver fra hodet til sin egen deque. Når den er tom, tar tråden en oppgave fra halen til deken til en annen opptatt tråd eller fra den globale inngangskøen, siden det er her de største arbeidene sannsynligvis vil bli plassert.

Denne tilnærmingen minimerer muligheten for at tråder vil konkurrere om oppgaver. Det reduserer også antall ganger tråden må gå på jakt etter arbeid, da den fungerer på de største tilgjengelige biter av arbeid først.

2.2. ForkJoinPool Instantiering

I Java 8, den mest praktiske måten å få tilgang til forekomsten av ForkJoinPool er å bruke sin statiske metode commonPool (). Som navnet antyder, vil dette gi en referanse til den felles poolen, som er en standard trådpool for alle ForkJoinTask.

I følge Oracles dokumentasjon reduserer bruk av den forhåndsdefinerte fellesgruppen ressursforbruket, siden dette motvirker etableringen av en egen trådgruppe per oppgave.

ForkJoinPool commonPool = ForkJoinPool.commonPool ();

Den samme oppførselen kan oppnås i Java 7 ved å opprette en ForkJoinPool og tilordne den til en offentlig statisk felt i en nytteklasse:

offentlig statisk ForkJoinPool forkJoinPool = ny ForkJoinPool (2);

Nå er det lett tilgjengelig:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Med ForkJoinPool’s konstruktører, er det mulig å lage et tilpasset trådbasseng med et spesifikt nivå av parallellitet, trådfabrikk og unntakshåndterer. I eksemplet ovenfor har bassenget et parallellitetsnivå på 2. Dette betyr at bassenget vil bruke to prosessorkjerner.

3. ForkJoinTask

ForkJoinTask er basistypen for oppgaver utført inne ForkJoinPool. I praksis bør en av de to underklassene utvides: Rekursiv handling til tomrom oppgaver og Rekursiv oppgave for oppgaver som returnerer en verdi.De har begge en abstrakt metode beregne () der oppgavens logikk er definert.

3.1. RecursiveAction - et eksempel

I eksemplet nedenfor er arbeidsenheten som skal behandles representert med a String kalt arbeidsmengde. For demonstrasjonsformål er oppgaven en meningsløs oppgave: den bokstaver bare innspillene og logger den.

For å demonstrere rammeverkets gaffeloppførsel, eksemplet deler oppgaven hvis arbeidsmengde.lengde() er større enn en spesifisert terskelbruker createSubtask () metode.

Strengen er rekursivt delt inn i understrenger, og skaper CustomRecursiveTask tilfeller som er basert på disse underlagene.

Som et resultat returnerer metoden a Liste.

Listen sendes til ForkJoinPool bruker påkalleAll () metode:

offentlig klasse CustomRecursiveAction utvider RecursiveAction {private String workload = ""; privat statisk slutt int TRESHOLD = 4; privat statisk loggerlogger = Logger.getAnonymousLogger (); public CustomRecursiveAction (String workload) {this.workload = workload; } @ Override-beskyttet ugyldig beregning () {if (workload.length ()> THRESHOLD) {ForkJoinTask.invokeAll (createSubtasks ()); } annet {prosessering (arbeidsmengde); }} privat liste createSubtasks () {List subtasks = new ArrayList (); String partOne = workload.substring (0, workload.length () / 2); String partTwo = workload.substring (workload.length () / 2, workload.length ()); subtasks.add (ny CustomRecursiveAction (partOne)); subtasks.add (ny CustomRecursiveAction (partTwo)); returnere deloppgaver; } privat ugyldig behandling (String arbeid) {String resultat = work.toUpperCase (); logger.info ("Dette resultatet - (" + resultat + ") - ble behandlet av" + Thread.currentThread (). getName ()); }}

Dette mønsteret kan brukes til å utvikle ditt eget Rekursiv handling klasser. For å gjøre dette, lag et objekt som representerer den totale arbeidsmengden, velg en passende terskel, definer en metode for å dele opp arbeidet, og definer en metode for å gjøre arbeidet.

3.2. Rekursiv oppgave

For oppgaver som returnerer en verdi, er logikken her lik, bortsett fra at resultatet for hver deloppgave er samlet i et enkelt resultat:

offentlig klasse CustomRecursiveTask utvider RecursiveTask {private int [] arr; privat statisk slutt int TRESHOLD = 20; offentlig CustomRecursiveTask (int [] arr) {this.arr = arr; } @ Override-beskyttet heltal beregne () {if (arr.length> THRESHOLD) {return ForkJoinTask.invokeAll (createSubtasks ()) .stream () .mapToInt (ForkJoinTask :: join) .sum (); } annet {returbehandling (arr); }} privat samling createSubtasks () {List dividTasks = new ArrayList (); divisionTasks.add (ny CustomRecursiveTask (Arrays.copyOfRange (arr, 0, arr. lengde / 2))); delt oppgaver.add (ny CustomRecursiveTask (Arrays.copyOfRange (arr, arr. lengde / 2, arr. lengde))); returnerte delte oppgaver; } privat Heltallbehandling (int [] arr) {return Arrays.stream (arr) .filter (a -> a> 10 && a a * 10) .sum (); }}

I dette eksemplet er verket representert av en matrise lagret i arr felt av CustomRecursiveTask klasse. De createSubtasks () metode deler rekursivt oppgaven i mindre arbeidsstykker til hvert stykke er mindre enn terskelen. Og så påkalleAll () metoden sender underoppgavene til det felles bassenget og returnerer en liste over Framtid.

For å utløse henrettelse, er bli med() metoden kalles for hver deloppgave.

I dette eksemplet oppnås dette ved hjelp av Java 8-er Stream API; de sum() metoden brukes som en representasjon av å kombinere delresultater i det endelige resultatet.

4. Sende oppgaver til ForkJoinPool

For å sende oppgaver til trådgruppen kan få tilnærminger brukes.

De sende inn() eller henrette()metode (brukssakene deres er de samme):

forkJoinPool.execute (customRecursiveTask); int resultat = customRecursiveTask.join ();

De påkalle ()metoden forkaster oppgaven og venter på resultatet, og trenger ingen manuell sammenkobling:

int resultat = forkJoinPool.invoke (customRecursiveTask);

De påkalleAll () metoden er den mest praktiske måten å sende inn en sekvens av ForkJoinTasks til ForkJoinPool. Det tar oppgaver som parametere (to oppgaver, var args eller en samling), gafler returnerer deretter en samling av Framtid gjenstander i rekkefølgen de ble produsert i.

Alternativt kan du bruke separat gaffel() og bli med() metoder. De gaffel() metoden sender en oppgave til et basseng, men den utløser ikke kjøringen. De bli med() metoden må brukes til dette formålet. I tilfelle av Rekursiv handling, den bli med() returnerer ikke annet enn null; til Rekursiv oppgave, det returnerer resultatet av oppgavens utførelse:

customRecursiveTaskFirst.fork (); resultat = customRecursiveTaskLast.join ();

I vår Rekursiv oppgave eksempel brukte vi påkalleAll () metode for å sende en sekvens av underoppgaver til bassenget. Den samme jobben kan gjøres med gaffel() og bli med(), selv om dette har konsekvenser for rekkefølgen av resultatene.

For å unngå forvirring er det generelt en god ide å bruke påkalleAll () metode for å sende inn mer enn én oppgave til ForkJoinPool.

5. Konklusjoner

Bruk av gaffel / sammenføyningsrammeverket kan øke behandlingen av store oppgaver, men for å oppnå dette resultatet, bør noen retningslinjer følges:

  • Bruk så få trådbassenger som mulig - i de fleste tilfeller er den beste avgjørelsen å bruke en trådgruppe per applikasjon eller system
  • Bruk standard felles trådgruppe, hvis ingen spesifikk innstilling er nødvendig
  • Bruk en rimelig terskel for splitting ForkJoinTask i deloppgaver
  • Unngå blokkering iForkJoinTasks

Eksemplene som brukes i denne artikkelen er tilgjengelige i det koblede GitHub-depotet.


$config[zx-auto] not found$config[zx-overlay] not found