Guide til Akka Streams

1. Oversikt

I denne artikkelen vil vi se på akka-streams bibliotek som er bygget oppå Akka-aktørens rammeverk, som overholder manifestet for reaktive strømmer. Akka Streams API lar oss enkelt komponere datatransformasjonsflyter fra uavhengige trinn.

Videre gjøres all behandling på en reaktiv, ikke-blokkerende og asynkron måte.

2. Maven-avhengigheter

For å komme i gang, må vi legge til akka-stream og akka-stream-testkit biblioteker i vår pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

For å jobbe med Akka Streams, må vi være klar over de viktigste API-konseptene:

  • Kilde - inngangspunktet for behandling i akka-stream bibliotek - vi kan opprette en forekomst av denne klassen fra flere kilder; for eksempel kan vi bruke enkelt() metode hvis vi ønsker å lage en Kilde fra en singel String, eller vi kan opprette en Kilde fra en Iterabel av elementer
  • Strømme - hovedbehandlingsblokken - hver Strømme forekomst har en inngang og en utgangsverdi
  • Materializer - we kan bruke en hvis vi vil ha vår Strømme å ha noen bivirkninger som å logge eller lagre resultater; oftest vil vi passere Ikke brukt alias som en Materializer å betegne at vår Strømme burde ikke ha noen bivirkninger
  • Synke drift - når vi bygger en Strømme, den blir ikke utført før vi registrerer a Synke operasjon på den - det er en terminaloperasjon som utløser alle beregninger i hele Strømme

4. Å skape Strømmer i Akka Streams

La oss begynne med å bygge et enkelt eksempel, hvor vi skal vise hvordan du gjør det lage og kombinere flere Strømmes - for å behandle en strøm av heltall og beregne det gjennomsnittlige bevegelige vinduet for heltallspar fra strømmen.

Vi analyserer et semikolonavgrenset String av heltall som input for å skape vårt akka-stream Kilde for eksemplet.

4.1. Bruker en Strømme til Parse Input

La oss først lage en DataImporter klasse som tar en forekomst av ActorSystem som vi senere vil bruke til å lage våre Strømme:

offentlig klasse DataImporter {private ActorSystem actorSystem; // standard konstruktører, getters ...}

La oss deretter lage en parseLine metode som vil generere en Liste av Heltall fra vår avgrensede innspill String. Husk at vi bare bruker Java Stream API her for parsing:

private List parseLine (String line) {String [] fields = line.split (";"); returnere Arrays.stream (felt) .map (Heltall :: parseInt) .collect (Collectors.toList ()); }

Vår første Strømme vil gjelde parseLine til våre innspill for å lage en Strømme med inngangstype String og utgangstype Heltall:

private Flow parseContent () {return Flow.of (String.class) .mapConcat (this :: parseLine); }

Når vi kaller parseLine () metode, vet kompilatoren at argumentet til den lambda-funksjonen vil være a String - samme som inngangstypen til vår Strømme.

Merk at vi bruker mapConcat () metode - tilsvarer Java 8 flatMap () metode - fordi vi ønsker å flate Liste av Heltall returnert av parseLine () inn i en Strømme av Heltall slik at påfølgende trinn i behandlingen ikke trenger å håndtere Liste.

4.2. Bruker en Strømme å utføre beregninger

På dette punktet har vi vår Strømme av analyserte heltall. Nå må vi implementere logikk som vil gruppere alle inngangselementene i par og beregne et gjennomsnitt av disse parene.

Nå skal vi lage en Strømme av Heltalls og gruppere dem ved hjelp av gruppert () metode.

Deretter ønsker vi å beregne et gjennomsnitt.

Siden vi ikke er interessert i rekkefølgen som gjennomsnittet vil bli behandlet i, kan vi har gjennomsnitt beregnet parallelt ved hjelp av flere tråder ved å bruke mapAsyncUnordered () metode, sender antall tråder som et argument til denne metoden.

Handlingen som vil bli gitt som lambda til Strømme trenger å returnere a Fullførbar fremtid fordi den handlingen blir beregnet asynkront i den separate tråden:

private Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, integers -> CompletableFuture.supplyAsync (() -> integers.stream () .mapToDouble (v -> v). gjennomsnittlig () .ellerElse (-1,0))); }

Vi beregner gjennomsnitt i åtte parallelle tråder. Merk at vi bruker Java 8 Stream API for å beregne et gjennomsnitt.

4.3. Komponere flere Strømmer inn i en singel Strømme

De Strømme API er en flytende abstraksjon som lar oss komponere flere Strømme tilfeller for å oppnå vårt endelige behandlingsmål. Vi kan ha granulære strømmer der man for eksempel analyserer JSON, en annen gjør litt transformasjon, og en annen samler litt statistikk.

Slik granularitet vil hjelpe oss med å lage mer testbar kode fordi vi kan teste hvert behandlingstrinn uavhengig.

Vi opprettet to strømmer over som kan fungere uavhengig av hverandre. Nå vil vi komponere dem sammen.

Først vil vi analysere innspillene våre String, og deretter vil vi beregne et gjennomsnitt på en strøm av elementer.

Vi kan komponere våre strømmer ved hjelp av via () metode:

Flow calcAAverage () {return Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

Vi opprettet en Strømme har inngangstype String og to andre flyter etter den. De parseContent ()Strømme tar en String input og returnerer en Heltall som utgang. De computeAverage () Flow tar det Heltall og beregner en gjennomsnittlig retur Dobbelt som utgangstype.

5. Legge til Synke til Strømme

Som vi nevnte, til dette punktet hele Strømme er ennå ikke henrettet fordi den er lat. For å starte utførelse av Strømme vi må definere en Synke. De Synke operasjonen kan for eksempel lagre data i en database, eller sende resultater til en ekstern webtjeneste.

Anta at vi har en Gjennomsnittlig depot klasse med følgende lagre() metode som skriver resultater til databasen vår:

CompletionStage save (Dobbelt gjennomsnitt) {return CompletableFuture.supplyAsync (() -> {// skriv til databaseresultat gjennomsnitt;}); }

Nå vil vi lage en Synke operasjon som bruker denne metoden for å lagre resultatene av vår Strømme behandling. Å skape vår Synke, vi trenger det først lage en Strømme som tar et resultat av behandlingen vår som inngangstype. Deretter vil vi lagre alle resultatene våre i databasen.

Igjen bryr vi oss ikke om bestilling av elementene, så vi kan utføre lagre() operasjoner parallelt bruker mapAsyncUnordered () metode.

Å lage en Synke fra Strømme vi må ringe toMat () med Sink.ignore () som et første argument og Hold til høyre() som det andre fordi vi ønsker å returnere en status for behandlingen:

privat vask storeAverages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. Definere en kilde for Strømme

Det siste vi må gjøre er å lage en Kilde fra inngangen String. Vi kan bruke en beregne gjennomsnitt ()Strømme til denne kilden ved hjelp av via () metode.

For å legge til Synke til behandlingen, må vi ringe runWith () metode og passere storeAverages () Vask som vi nettopp opprettet:

CompletionStage calcAAverForContent (String content) {return Source.single (content) .via (calcalAverage ()) .runWith (storeAverages (), ActorMaterializer.create (actorSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("Import ferdig");} annet {e.printStackTrace ();}}); }

Merk at når behandlingen er ferdig, legger vi til nårFullstendig () tilbakeringing, der vi kan utføre noen handlinger avhengig av resultatet av behandlingen.

7. Testing Akka Streams

Vi kan teste behandlingen vår ved hjelp av akka-stream-testkit.

Den beste måten å teste den faktiske logikken i behandlingen er å teste alt Strømme logikk og bruk TestSink for å utløse beregningen og hevde resultatene.

I testen vår skaper vi Strømme som vi ønsker å teste, og deretter lager vi en Kilde fra testinnholdet:

@Test offentlig ugyldig gittStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// gitt Flow testet = ny DataImporter (actorSystem) .calculateAverage (); Strenginngang = "1; 9; 11; 0"; // når Kilde flyt = Source.single (input) .via (testet); // deretter flyt .runWith (TestSink.probe (actorSystem), ActorMaterializer.create (actorSystem)) .request (4) .expectNextUnordered (5d, 5.5); }

Vi sjekker at vi forventer fire inngangsargumenter, og to resultater som er gjennomsnitt kan komme i hvilken som helst rekkefølge fordi behandlingen vår blir gjort på den asynkrone og parallelle måten.

8. Konklusjon

I denne artikkelen så vi på akka-stream bibliotek.

Vi definerte en prosess som kombinerer flere Strømmer for å beregne glidende gjennomsnitt av elementer. Deretter definerte vi a Kilde det er et inngangspunkt for strømbehandling og a Synke som utløser selve behandlingen.

Til slutt skrev vi en test for behandlingen vår med akka-stream-testkit.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found