Introduksjon til RxJava

1. Oversikt

I denne artikkelen skal vi fokusere på å bruke Reactive Extensions (Rx) i Java til å komponere og konsumere datasekvenser.

Med et øyeblikk kan API-et ligne Java 8 Streams, men faktisk er det mye mer fleksibelt og flytende, noe som gjør det til et kraftig programmeringsparadigme.

Hvis du vil lese mer om RxJava, kan du sjekke ut denne skrivingen.

2. Oppsett

For å bruke RxJava i vårt Maven-prosjekt, må vi legge til følgende avhengighet til vår pom.xml:

 io.reactivex rxjava $ {rx.java.version} 

Eller for et Gradle-prosjekt:

kompilere 'io.reactivex.rxjava: rxjava: x.y.z'

3. Funksjonelle reaktive konsepter

På en side, funksjonell programmering er prosessen med å bygge programvare ved å komponere rene funksjoner, unngå delt tilstand, foranderlige data og bivirkninger.

På den andre siden, reaktiv programmering er et asynkront programmeringsparadigme som er opptatt av datastrømmer og forplantning av endring.

Sammen, funksjonell reaktiv programmering danner en kombinasjon av funksjonelle og reaktive teknikker som kan representere en elegant tilnærming til hendelsesdrevet programmering - med verdier som endres over tid og hvor forbrukeren reagerer på dataene når de kommer inn.

Denne teknologien samler forskjellige implementeringer av kjerneprinsippene, noen forfattere kom med et dokument som definerer det vanlige ordforrådet for å beskrive den nye typen applikasjoner.

3.1. Reaktivt manifest

The Reactive Manifesto er et elektronisk dokument som legger høy standard for applikasjoner innen programvareutviklingsindustrien. Enkelt sagt, reaktive systemer er:

  • Responsive - systemer skal svare på en riktig måte
  • Message Driven - systemer bør bruke asynkronisering av meldingsoverføring mellom komponenter for å sikre løs kobling
  • Elastiske - systemene skal være lydhøre under høy belastning
  • Motstandsdyktig - systemer skal være lydhøre når noen komponenter svikter

4. Observerbare

Det er to viktige typer å forstå når du jobber med Rx:

  • Observerbar representerer ethvert objekt som kan få data fra en datakilde og hvis tilstand kan være av interesse på en måte som andre objekter kan registrere en interesse
  • An observatør er ethvert objekt som ønsker å bli varslet når tilstanden til et annet objekt endres

An observatør abonnerer på en Observerbar sekvens. Sekvensen sender elementer til observatør en om gangen.

De observatør håndterer hver enkelt før du behandler den neste. Hvis mange hendelser kommer asynkront, må de lagres i kø eller slippes.

I Rx, en observatør vil aldri bli ringt med en vare ute av drift eller ringt før tilbakeringingen har returnert for den forrige varen.

4.1. Typer av Observerbar

Det er to typer:

  • Ikke-blokkering - asynkron kjøring støttes og har lov til å avslutte abonnementet når som helst i hendelsesstrømmen. På denne artikkelen vil vi mest fokusere på denne typen type
  • Blokkering - alle påNeste observatøranrop vil være synkron, og det er ikke mulig å avslutte abonnementet midt i en hendelsesstrøm. Vi kan alltid konvertere en Observerbar inn i en Blokkerende observerbar, ved hjelp av metoden toBlocking:
BlockingObservable blockingObservable = observerbar.toBlocking ();

4.2. Operatører

An operatør er en funksjon som tar en Observable (kilden) som sitt første argument og returnerer et annet Observerbar (målet). For hvert element som den observerbare kilden sender ut, vil den bruke en funksjon på det elementet, og deretter sende resultatet på destinasjonen Observerbar.

Operatører kan lenkes sammen for å skape komplekse datastrømmer som filtrerer hendelse basert på visse kriterier. Flere operatører kan brukes på det samme observerbar.

Det er ikke vanskelig å komme i en situasjon der en Observerbar sender ut gjenstander raskere enn en operatør eller observatør kan konsumere dem. Du kan lese mer om mottrykk her.

4.3. Lag Observable

Den grunnleggende operatøren bare produserer en Observerbar som avgir en enkelt generisk forekomst før den er fullført, String "Hallo". Når vi ønsker å få informasjon ut av en Observerbar, implementerer vi en observatør grensesnitt og ring abonner på ønsket Observerbar:

Observable observerable = Observable.just ("Hello"); observerbar. abonner (s -> resultat = s); assertTrue (result.equals ("Hello"));

4.4. OnNext, OnError, og PåFullført

Det er tre metoder på observatør grensesnitt som vi vil vite om:

  1. OnNext kalles på vår observatør hver gang et nytt arrangement blir publisert til vedlagte Observerbar. Dette er metoden der vi vil utføre noen handlinger på hver hendelse
  2. PåFullført kalles når sekvensen av hendelser assosiert med en Observerbar er komplett, noe som indikerer at vi ikke bør forvente mer påNeste oppfordrer vår observatør
  3. OnError kalles når et unhandled unntak kastes under RxJava rammekode eller vår hendelseshåndteringskode

Returverdien for Observerbareabonnere metoden er en abonnere grensesnitt:

Streng [] bokstaver = {"a", "b", "c", "d", "e", "f", "g"}; Observable observerable = Observable.from (letters); observerbar.abonnement (i -> resultat + = i, // OnNext Throwable :: printStackTrace, // OnError () -> result + = "_Completed" // OnCompleted); assertTrue (result.equals ("abcdefg_Completed"));

5. Observerbare transformasjoner og betingede operatører

5.1. Kart

Men operatør forvandler gjenstander som sendes ut av en Observerbar ved å bruke en funksjon på hvert element.

La oss anta at det er et erklært utvalg av strenger som inneholder noen bokstaver fra alfabetet, og vi ønsker å skrive dem ut i hovedmodus:

Observable.from (letters) .map (String :: toUpperCase) .abscribe (letter -> result + = letter); assertTrue (result.equals ("ABCDEFG"));

FlatMap kan brukes til å flate ut Observerbare når vi ender opp med nestede Observerbare.

Flere detaljer om forskjellen mellom kart og flatMap finner du her.

Forutsatt at vi har en metode som returnerer en Observerbar fra en liste over strenger. Nå skal vi skrive ut for hver streng fra en ny Observerbar listen over titler basert på hva Abonnent ser:

Observable getTitle () {return Observable.from (titleList); } Observable.just ("book1", "book2") .flatMap (s -> getTitle ()). Abonner (l -> resultat + = l); assertTrue (result.equals ("titletitle"));

5.2. Skann

De skanneoperatør autfører en funksjon på hvert element som sendes ut av et Observerbar sekvensielt og avgir hver suksessive verdi.

Det tillater oss å videreføre tilstand fra hendelse til hendelse:

Streng [] bokstaver = {"a", "b", "c"}; Observable.from (letters) .scan (new StringBuilder (), StringBuilder :: append.) Abonnement (total -> resultat + = total.toString ()); assertTrue (result.equals ("aababc"));

5.3. Gruppe av

Gruppe av operatør tillater oss å klassifisere hendelsene i inngangen Observerbar inn i utgangskategorier.

La oss anta at vi opprettet en rekke heltall fra 0 til 10, og deretter bruke gruppe av som vil dele dem inn i kategoriene til og med og merkelig:

Observable.from (numbers) .groupBy (i -> 0 == (i% 2)? "EVEN": "ODD"). Abonner (group -> group.subscribe ((number) -> {if (group.getKey) () .toString (). tilsvarer ("EVEN")) {EVEN [0] + = number;} else {ODD [0] + = number;}})); assertTrue (EVEN [0] .equals ("0246810")); assertTrue (ODD [0] .equals ("13579"));

5.4. Filter

Operatøren filter sender bare de elementene fra en observerbar som passerer en predikat test.

Så la oss filtrere i et heltall array for oddetallene:

Observable.from (numbers) .filter (i -> (i% 2 == 1)). Abonner (i -> result + = i); assertTrue (result.equals ("13579"));

5.5. Betingede operatører

StandardIfEmpty sender ut element fra kilden Observerbar, eller et standardelement hvis kilden Observerbar er tom:

Observable.empty () .defaultIfEmpty ("Observable is empty"). Abonner (s -> resultat + = s); assertTrue (result.equals ("Observable is empty"));

Følgende kode avgir den første bokstaven i alfabeteten' fordi matrisen bokstaver er ikke tom, og dette er hva den inneholder i første posisjon:

Observerbar.fra (bokstaver) .defaultIfEmpty ("Observerbar er tom"). Første (). Abonner (s -> resultat + = s); assertTrue (result.equals ("a"));

Ta en stund operatøren kaster gjenstander som sendes ut av Observerbar etter at en spesifisert tilstand blir falsk:

Observable.from (numbers) .takeWhile (i -> i sum [0] + = s); assertTrue (sum [0] == 10);

Selvfølgelig er det flere andre operatører som kan dekke våre behov som Inneholder, SkipWhile, SkipUntil, TakeUntil, etc.

6. Koblingsbare observerbare ting

EN Kan kobles til: Observerbar ligner en vanlig Observerbar, bortsett fra at den ikke begynner å sende ut varer når den abonnerer på, men bare når koble operatøren blir brukt på den.

På denne måten kan vi vente på at alle tiltenkte observatører skal abonnere på Observerbar før Observerbar begynner å sende ut ting:

Streng [] resultat = {""}; ConnectableObservable connectable = Observable.interval (200, TimeUnit.MILLISECONDS) .publish (); connectable.subscribe (i -> resultat [0] + = i); assertFalse (resultat [0] .equals ("01")); connectable.connect (); Tråd. Søvn (500); assertTrue (resultat [0] .equals ("01"));

7. Singel

Enkelt er som en Observerbar som i stedet for å sende ut en serie verdier, sender ut en verdi eller en feilmelding.

Med denne datakilden kan vi bare bruke to metoder for å abonnere:

  • OnSuccess returnerer a Enkelt som også kaller en metode vi spesifiserer
  • OnError returnerer også a Enkelt som umiddelbart underretter abonnenter om en feil
Streng [] resultat = {""}; Enkelt singel = Observable.just ("Hello") .toSingle () .doOnSuccess (i -> result [0] + = i) .doOnError (error -> {throw new RuntimeException (error.getMessage ());}); single.subscribe (); assertTrue (resultat [0] .equals ("Hello"));

8. Emner

EN Emne er samtidig to elementer, a abonnent og en observerbar. Som abonnent kan et emne brukes til å publisere hendelsene som kommer fra mer enn en observerbar.

Og fordi det også kan observeres, kan hendelsene fra flere abonnenter sendes ut som hendelser til alle som observerer det.

I neste eksempel ser vi på hvordan observatørene vil kunne se hendelsene som oppstår etter at de abonnerer:

Heltalsabonnent1 = 0; Heltall abonnent2 = 0; Observer getFirstObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber1 + = value; } @ Override public void onError (Throwable e) {System.out.println ("error"); } @ Override public void onCompleted () {System.out.println ("Abonnent1 fullført"); }}; } Observer getSecondObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber2 + = value; } @ Override public void onError (Throwable e) {System.out.println ("error"); } @ Override public void onCompleted () {System.out.println ("Abonnent2 fullført"); }}; } PublishSubject subject = PublishSubject.create (); subject.subscribe (getFirstObserver ()); subject.onNext (1); subject.onNext (2); subject.onNext (3); subject.subscribe (getSecondObserver ()); subject.onNext (4); subject.onCompleted (); assertTrue (subscriber1 + subscriber2 == 14)

9. Ressursledelse

Ved hjelp av operasjonen lar oss knytte ressurser, for eksempel en JDBC-databasetilkobling, en nettverkstilkobling eller åpne filer til våre observerbare.

Her presenterer vi i kommentarene trinnene vi trenger å gjøre for å nå dette målet, og også et eksempel på implementering:

Streng [] resultat = {""}; Observerbare verdier = Observable.using (() -> "MyResource", r -> {return Observable.create (o -> {for (Character c: r.toCharArray ()) {o.onNext (c);} o. onCompleted ();});}, r -> System.out.println ("Disposed:" + r)); verdier. abonner (v -> resultat [0] + = v, e -> resultat [0] + = e); assertTrue (resultat [0] .equals ("MyResource"));

10. Konklusjon

I denne artikkelen har vi snakket om hvordan du bruker RxJava-biblioteket, og hvordan vi kan utforske de viktigste funksjonene.

Den fulle kildekoden for prosjektet, inkludert alle kodeeksemplene som brukes her, finner du på Github.


$config[zx-auto] not found$config[zx-overlay] not found