RxJava 2 - Flytbar

1. Introduksjon

RxJava er en Java-implementering av reaktive utvidelser som lar oss skrive hendelsesdrevne og asynkrone applikasjoner. Mer informasjon om hvordan du bruker RxJava finner du i intro-artikkelen vår her.

RxJava 2 ble omskrevet fra bunnen av, noe som ga flere nye funksjoner; noen av dem ble opprettet som et svar på problemer som eksisterte i forrige versjon av rammeverket.

En av slike funksjoner er io.reactivex.Flowable.

2. Observerbar vs.. Flytbar

I den forrige versjonen av RxJava, var det bare en basisklasse for å håndtere kilder med mottrykk og ikke mottrykk - Observerbar.

RxJava 2 introduserte et klart skille mellom disse to typer kilder - mottrykkbevisste kilder er nå representert ved hjelp av en dedikert klasse - Flytbar.

Observerbar kilder støtter ikke mottrykk. På grunn av dette bør vi bruke den til kilder som vi bare bruker og ikke kan påvirke.

Også, hvis vi har å gjøre med et stort antall elementer, kan to mulige scenarier forbundet med mottrykk oppstå avhengig av typen av Observerbar.

Ved bruk av en såkalt kald Observerbar“, hendelser sendes ut lat, så vi er trygge på å overfylle en observatør.

Når du bruker en varmt Observerbardette vil imidlertid fortsette å sende ut hendelser, selv om forbrukeren ikke kan følge med.

3. Opprette en Flytbar

Det er forskjellige måter å lage en Flytbar. Praktisk for oss, disse metodene ligner metodene i Observerbar i den første versjonen av RxJava.

3.1. Enkel Flytbar

Vi kan lage en Flytbar bruker bare() metode på samme måte som vi kunne med Observerbar:

Flowable integerFlowable = Flowable.just (1, 2, 3, 4);

Selv om du bruker bare() er ganske enkelt, er det ikke veldig vanlig å lage en Flytbar fra statiske data, og den brukes til testformål.

3.2. Flytbar fra Observerbar

Når vi har en Observerbar vi kan enkelt forvandle det til Flytbar bruker toFlowable () metode:

Observable integerObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);

Legg merke til at for å kunne utføre konverteringen, må vi berike Observerbar med en Mottrykk Strategi. Vi beskriver tilgjengelige strategier i neste avsnitt.

3.3. Flytbar fra FlowableOnSubscribe

RxJava 2 introduserte et funksjonelt grensesnitt FlowableOnSubscribe, som representerer en Flytbar som begynner å sende ut hendelser etter at forbrukeren abonnerer på den.

På grunn av dette vil alle klienter motta det samme settet med hendelser, noe som gjør FlowableOnSubscribe mottrykk-trygt.

Når vi har FlowableOnSubscribe vi kan bruke den til å lage Flytbar:

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);

Dokumentasjonen beskriver mange flere metoder å lage Flytbar.

4. FlytbarMottrykk Strategi

Noen metoder som toFlowable () eller skape() ta en Mottrykk Strategi som argument.

De Mottrykk Strategi er en oppregning som definerer mottrykksatferden som vi skal bruke på vår Flytbar.

Det kan cache eller slippe hendelser eller ikke implementere noen oppførsel i det hele tatt, i det siste vil vi være ansvarlige for å definere det ved hjelp av mottrykksoperatorer.

Mottrykk Strategi den er lik Mottrykksmodus til stede i forrige versjon av RxJava.

Det er fem forskjellige strategier tilgjengelig i RxJava 2.

4.1. Buffer

Hvis vi bruker MottrykkStrategi.BUFFER, kilden vil buffere alle hendelsene til abonnenten kan konsumere dem:

offentlig ugyldig thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observerable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Liste mottattInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertEquals (testList, receivedInts); }

Det ligner på å påkalle onBackpressureBuffer () metode på Flytbar, men det tillater ikke å definere en bufferstørrelse eller onOverflow-handlingen eksplisitt.

4.2. Miste

Vi kan bruke MottrykkStrategy.DROP for å forkaste hendelsene som ikke kan konsumeres i stedet for å buffere dem.

Igjen ligner dette på bruk onBackpressureDrop()Flytbar:

offentlig ugyldig nårDropStrategyUsed_thenOnBackpressureDropped () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Liste mottattInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (! receivedInts.contains (100000)); }

4.3. Siste

Bruker MottrykkStrategi.LATEST vil tvinge kilden til å beholde bare de siste hendelsene, og dermed overskrive tidligere verdier hvis forbrukeren ikke kan følge med:

offentlig ugyldig nårLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Liste mottattInter = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (receivedInts.contains (100000)); }

BackpressureStrategy.LATEST og BackpressureStrategy.DROP ser veldig like ut når vi ser på koden.

Derimot, MottrykkStrategi.LATEST vil overskrive elementer som abonnenten ikke kan håndtere og beholde bare de nyeste, derav navnet.

MottrykkStrategy.DROP, på den annen side, vil forkaste elementer som ikke kan håndteres. Dette betyr at de nyeste elementene ikke nødvendigvis sendes ut.

4.4. Feil

Når vi bruker MottrykkStrategy.ERROR, vi sier ganske enkelt det vi forventer ikke mottrykk. Følgelig a Mangler BackpressureException skal kastes hvis forbrukeren ikke kan følge med på kilden:

offentlig ugyldig nårErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); TestSubscriber subscriber = observerbar .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

4.5. Savnet

Hvis vi bruker MottrykkStrategi.MISSING, kilden vil skyve elementer uten å forkaste eller buffere.

Nedstrøms vil måtte håndtere overløp i dette tilfellet:

offentlig ugyldig nårMissingStrategyUsed_thenException () {Observable observerable = Observable.range (1, 100000); TestSubscriber subscriber = observerbar .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

I våre tester unntar vi det Mangler baktrykk unntak for begge FEIL og SAVNET strategier. Da begge to vil kaste slikt unntak når kildens interne buffer blir overfylt.

Det er imidlertid verdt å merke seg at begge har et annet formål.

Vi bør bruke den tidligere når vi ikke forventer mottrykk i det hele tatt, og vi vil at kilden skal kaste et unntak i tilfelle det oppstår.

Sistnevnte kan brukes hvis vi ikke vil spesifisere en standard atferd ved opprettelsen av Flytbar. Og vi skal bruke mottrykksoperatorer for å definere det senere.

5. Sammendrag

I denne veiledningen har vi presentert den nye klassen som ble introdusert i RxJava 2 kalt Flytbar.

For å finne mer informasjon om Flytbar selve og det er API kan vi henvise til dokumentasjonen.

Som alltid kan alle kodeeksemplene finnes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found