Introduksjon til reaktorkjerne

1. Introduksjon

Reactor Core er et Java 8-bibliotek som implementerer den reaktive programmeringsmodellen. Den er bygget på toppen av Reactive Streams Specification, en standard for å bygge reaktive applikasjoner.

Fra bakgrunnen av ikke-reaktiv Java-utvikling kan det å gå reaktivt være en ganske bratt læringskurve. Dette blir mer utfordrende når man sammenligner det med Java 8 Strøm API, da de kan forveksles med å være de samme abstraksjonene på høyt nivå.

I denne artikkelen vil vi forsøke å avmystifisere dette paradigmet. Vi tar små skritt gjennom Reactor til vi har bygget et bilde av hvordan vi kan komponere reaktiv kode, og legger grunnlaget for at mer avanserte artikler kommer i en senere serie.

2. Spesifikasjoner for reaktive strømmer

Før vi ser på Reactor, bør vi se på Reactive Streams Specification. Dette er hva Reactor implementerer, og det legger grunnlaget for biblioteket.

I hovedsak er Reactive Streams en spesifikasjon for asynkron prosessering.

Med andre ord et system der mange arrangementer blir produsert og konsumert asynkront. Tenk på en strøm av tusenvis av aksjeoppdateringer per sekund som kommer inn i en økonomisk applikasjon, og at den må svare på disse oppdateringene i tide.

Et av hovedmålene med dette er å løse problemet med mottrykk. Hvis vi har en produsent som sender ut hendelser til en forbruker raskere enn den kan behandle dem, vil forbrukeren til slutt bli overveldet av hendelser og gå tom for systemressurser.

Mottrykk betyr at forbrukeren vår skal kunne fortelle produsenten hvor mye data som skal sendes for å forhindre dette, og det er det som er beskrevet i spesifikasjonen.

3. Maven-avhengigheter

Før vi begynner, la oss legge til våre Maven-avhengigheter:

 io.projectreactor reactor-core 3.3.9.RELEASE ch.qos.logback logback-classic 1.1.3 

Vi legger også til Logback som en avhengighet. Dette er fordi vi logger utdataene fra Reactor for bedre å forstå datastrømmen.

4. Å produsere en datastrøm

For at et program skal være reaktivt, må det første å produsere en datastrøm.

Dette kan være noe som lageroppdateringseksemplet vi ga tidligere. Uten disse dataene ville vi ikke ha noe å reagere på, og det er derfor dette er et logisk første skritt.

Reactive Core gir oss to datatyper som gjør at vi kan gjøre dette.

4.1. Flux

Den første måten å gjøre dette på er med en Flux. Det er en strøm som kan avgi 0..n elementer. La oss prøve å lage en enkel:

Flux just = Flux.just (1, 2, 3, 4);

I dette tilfellet har vi en statisk strøm av fire elementer.

4.2. Mono

Den andre måten å gjøre dette på er med en Mono, som er en strøm av 0..1 elementer. La oss prøve å starte en:

Mono bare = Mono.just (1);

Dette ser ut og oppfører seg nesten nøyaktig det samme som Flux, bare denne gangen er vi begrenset til ikke mer enn ett element.

4.3. Hvorfor ikke bare strømme?

Før du eksperimenterer videre, er det verdt å markere hvorfor vi har disse to datatypene.

Først skal det bemerkes at begge a Flux og Mono er implementeringer av Reactive Streams Forlegger grensesnitt. Begge klassene er i samsvar med spesifikasjonen, og vi kan bruke dette grensesnittet i stedet:

Publisher bare = Mono.just ("foo");

Men egentlig, det å vite denne kardinaliteten er nyttig. Dette er fordi noen få operasjoner bare gir mening for en av de to typene, og fordi det kan være mer uttrykksfullt (forestill deg Finn én() i et depot).

5. Abonnere på en strøm

Nå har vi et høyt nivå oversikt over hvordan vi kan produsere en datastrøm, vi må abonnere på den for at den skal kunne sende ut elementene.

5.1. Samleelementer

La oss bruke abonnere() metode for å samle alle elementene i en strøm:

Listeelementer = ny ArrayList (); Flux.just (1, 2, 3, 4) .log (). Abonner (elementer :: legg til); assertThat (elementer). inneholder nøyaktig (1, 2, 3, 4);

Dataene begynner ikke å strømme før vi abonnerer. Legg merke til at vi også har lagt til noen loggføringer, dette vil være nyttig når vi ser på hva som skjer bak kulissene.

5.2. Strømmen av elementer

Med pålogging kan vi bruke den til å visualisere hvordan dataene strømmer gjennom strømmen vår:

20: 25: 19.550 [hoved] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | forespørsel (ubegrenset) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onComplete ()

Først og fremst kjører alt på hovedtråden. La oss ikke gå inn på noen detaljer om dette, da vi tar en nærmere titt på samtidighet senere i denne artikkelen. Det gjør imidlertid ting enkle, ettersom vi kan takle alt i orden.

La oss nå gå gjennom sekvensen som vi har logget en etter en:

  1. onSubscribe () - Dette kalles når vi abonnerer på strømmen vår
  2. forespørsel (ubegrenset) - Når vi ringer abonnere, bak kulissene lager vi en Abonnement. Dette abonnementet ber om elementer fra strømmen. I dette tilfellet er det som standard ubegrenset, noe som betyr at den ber om hvert eneste tilgjengelige element
  3. onNext () - Dette kalles på hvert enkelt element
  4. onComplete () - Dette kalles siste, etter å ha mottatt det siste elementet. Det er faktisk en onError () også, som vil bli kalt hvis det er et unntak, men i dette tilfellet er det ikke

Dette er strømmen som er lagt ut i Abonnent grensesnitt som en del av Reactive Streams Specification, og i virkeligheten er det det som ble instantiert bak kulissene i vår oppfordring til onSubscribe (). Det er en nyttig metode, men la oss gi en for å bedre forstå hva som skjer Abonnent grensesnitt direkte:

Flux.just (1, 2, 3, 4) .log (). Abonner (ny abonnent () {@ Override offentlig ugyldighet onSubscribe (Abonnement s) {s.request (Long.MAX_VALUE);} @ Overstyr offentlig tomrom på Neste ( Heltall heltall) {elements.add (heltall);} @ Override public void onError (Throwable t) {} @ Override public void onComplete () {}});

Vi kan se at hvert mulig trinn i de ovennevnte flyter kart til en metode i Abonnent gjennomføring. Det hender bare at Flux har gitt oss en hjelpemetode for å redusere denne ordlighetsgraden.

5.3. Sammenligning med Java 8 Strømmer

Det ser fremdeles ut til at vi har noe synonymt med en Java 8 Strøm gjør samle:

Liste samlet = Stream.of (1, 2, 3, 4) .collect (toList ());

Bare vi ikke.

Kjerneforskjellen er at Reactive er en push-modell, mens Java 8 Strømmer er en pull-modell. I en reaktiv tilnærming er det hendelser dyttet til abonnentene når de kommer inn.

Den neste tingen å legge merke til er en Strømmer terminaloperatør er nettopp det, terminal, trekker alle dataene og returnerer et resultat. Med Reactive kan vi ha en uendelig strøm som kommer inn fra en ekstern ressurs, med flere abonnenter tilknyttet og fjernet på ad hoc-basis. Vi kan også gjøre ting som å kombinere strømmer, gassstrømmer og bruke mottrykk, som vi vil dekke videre.

6. Mottrykk

Den neste tingen vi bør vurdere er mottrykk. I vårt eksempel ber abonnenten produsenten om å skyve hvert eneste element på en gang. Dette kan ende opp med å bli overveldende for abonnenten og forbruker alle ressursene.

Mottrykk er når en nedstrøm kan fortelle en oppstrøms å sende færre data for å forhindre at den blir overveldet.

Vi kan endre vår Abonnent implementering for å bruke mottrykk. La oss fortelle oppstrøms å bare sende to elementer om gangen ved å bruke be om():

Flux.just (1, 2, 3, 4) .log (). Abonner (ny abonnent () {privat abonnement s; int onNextAmount; @ overstyr offentlig tomrom på abonnement (abonnement s) {this.s = s; s.request (2);} @Override public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @ Override public void onError (Throwable t) {} @Override public void onComplete () {}});

Nå hvis vi kjører koden vår igjen, får vi se forespørsel (2) kalles, etterfulgt av to påNeste () ringer, da forespørsel (2) en gang til.

23: 31: 15.395 [hoved] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 23: 31: 15.397 [main] INFO reactor.Flux.Array.1 - | forespørsel (2) 23: 31: 15.397 [main] INFO reactor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | forespørsel (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | forespørsel (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onComplete ()

I hovedsak er dette reaktivt mottrykk. Vi ber oppstrøms om å bare skyve en viss mengde elementer, og bare når vi er klare.

Hvis vi forestiller oss at vi ble streamet tweets fra twitter, ville det være opp til oppstrøms å bestemme hva vi skulle gjøre. Hvis tweets kom inn, men det ikke er noen forespørsler fra nedstrøms, kan oppstrøms slippe ting, lagre dem i en buffer eller en annen strategi.

7. Drift på en strøm

Vi kan også utføre operasjoner på dataene i strømmen vår og svare på hendelser slik vi ser det.

7.1. Kartlegging av data i en strøm

En enkel operasjon som vi kan utføre er å bruke en transformasjon. I dette tilfellet, la oss bare doble alle tallene i strømmen vår:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2). Abonner (element :: add);

kart() vil bli brukt når påNeste () er kalt.

7.2. Kombinere to strømmer

Vi kan da gjøre ting mer interessante ved å kombinere en annen strøm med denne. La oss prøve dette ved å bruke glidelås() funksjon:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (one, two) -> String.format ("First Flux:% d, Second Flux:% d", one, two)). Abonner (elementer :: add); assertThat (elementer). inneholder nøyaktig ("First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3 ");

Her skaper vi en annen Flux som fortsetter å øke med en og strømme den sammen med vår originale. Vi kan se hvordan disse fungerer sammen ved å inspisere loggene:

20: 04: 38.064 [hoved] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38.065 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (1) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | avbryt () 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | Avbryt()

Legg merke til hvordan vi nå har ett abonnement pr Flux. De påNeste () samtaler er også vekslet, så indeksen til hvert element i strømmen vil matche når vi bruker glidelås() funksjon.

8. Hot Streams

Foreløpig har vi primært fokusert på kalde strømmer. Dette er statiske strømmer med fast lengde som er enkle å håndtere. Et mer realistisk brukstilfelle for reaktivt kan være noe som skjer uendelig.

For eksempel kan vi ha en strøm av musebevegelser som hele tiden må reageres på eller en twitter-feed. Disse typer strømmer kalles hot streams, ettersom de alltid kjører og kan abonneres på når som helst, og mangler starten på dataene.

8.1. Opprette en ConnectableFlux

En måte å lage en varm strøm på er å konvertere en kald strøm til en. La oss lage en Flux som varer evig, med resultatene til konsollen, som vil simulere en uendelig strøm av data som kommer fra en ekstern ressurs:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Ved å ringe publisere() vi får en ConnectableFlux. Dette betyr at det ringer abonnere() vil ikke føre til at den begynner å sende, slik at vi kan legge til flere abonnementer:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

Hvis vi prøver å kjøre denne koden, vil ingenting skje. Det er ikke før vi ringer koble(), at Flux begynner å sende ut:

publish.connect ();

8.2. Struping

Hvis vi kjører koden vår, vil konsollen vår bli overveldet av loggføring. Dette simulerer en situasjon der for mye data blir sendt til forbrukerne våre. La oss prøve å omgå dette med struping:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}). Sample (ofSeconds (2)) .publish ();

Her har vi introdusert en prøve() metode med et intervall på to sekunder. Nå blir verdiene bare presset til abonnenten annen hvert sekund, noe som betyr at konsollen vil være mye mindre hektisk.

Selvfølgelig er det flere strategier for å redusere mengden data som sendes nedstrøms, for eksempel vinduering og buffering, men de vil bli utenfor rekkevidden for denne artikkelen.

9. Samtidighet

Alle eksemplene ovenfor har for tiden kjørt på hovedtråden. Vi kan imidlertid kontrollere hvilken tråd koden vår kjører på hvis vi vil. De Planlegger grensesnittet gir en abstraksjon rundt asynkron kode, som mange implementeringer er gitt for oss. La oss prøve å abonnere på en annen tråd til hoved:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elements :: add);

De Parallell scheduler vil føre til at abonnementet vårt kjøres på en annen tråd, som vi kan bevise ved å se på loggene. Vi ser at den første oppføringen kommer fra hoved- og Flux kjører i en annen tråd som heter parallell-1.

20:03:27.505 [hoved] DEBUG reactor.util.Loggers $ LoggerFactory - Bruk av Slf4j logging framework 20: 03: 27.529 [parallell-1] INFO reactor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | forespørsel (ubegrenset) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | påNeste (1) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | påNeste (2) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [parallell-1] INFO reactor.Flux.Array.1 - | onComplete ()

Samtidighet er mer interessant enn dette, og det vil være verdt å utforske det i en annen artikkel.

10. Konklusjon

I denne artikkelen har vi gitt en oversikt over reaktive kjerne på høyt nivå. Vi har forklart hvordan vi kan publisere og abonnere på strømmer, bruke mottrykk, operere på strømmer og også håndtere data asynkront. Dette skulle forhåpentligvis legge et grunnlag for oss å skrive reaktive applikasjoner.

Senere artikler i denne serien vil dekke mer avansert samtidighet og andre reaktive konsepter. Det er også en annen artikkel som dekker Reactor with Spring.

Kildekoden for applikasjonen vår er tilgjengelig på GitHub; dette er et Maven-prosjekt som skal kunne kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found