Kombinere Observables i RxJava

1. Introduksjon

I denne raske opplæringen diskuterer vi forskjellige måter å kombinere på Observerbare i RxJava.

Hvis du er ny i RxJava, bør du definitivt sjekke ut denne introveiledningen først.

La oss hoppe rett inn.

2. Observerbare

Observerbar sekvenser, eller rett og slett Observerbare, er representasjoner av asynkrone datastrømmer.

Disse er basert på observatørmønsteret der et objekt som kalles en Observatørabonnerer på varer som sendes ut av en Observerbar.

Abonnementet er ikke-blokkerende som Observatør står for å reagere på hva som helst Observerbar vil slippe ut i fremtiden. Dette letter i sin tur samtidighet.

Her er en enkel demonstrasjon i RxJava:

Observerbar .fra (ny streng [] {"John", "Doe"}). Abonner (navn -> System.out.println ("Hei" + navn))

3. Kombinere observerbare ting

Når du programmerer ved hjelp av et reaktivt rammeverk, er det vanlig å kombinere forskjellige Observerbare.

I en webapplikasjon, for eksempel, kan det hende vi trenger å få to sett med asynkrone datastrømmer som er uavhengige av hverandre.

I stedet for å vente på at forrige strøm skal fullføres før vi ber om neste strøm, kan vi ringe begge samtidig og abonnere på de kombinerte strømmer.

I denne delen vil vi diskutere noen av de forskjellige måtene vi kan kombinere flere på Observerbare i RxJava og de forskjellige brukssakene som hver metode gjelder for.

3.1. Slå sammen

Vi kan bruke slå sammen operatør for å kombinere produksjonen fra flere Observerbare slik at de oppfører seg som en:

@Test offentlig ugyldighet gittTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.merge (Observable.from (new String [] {"Hello", "World"}), Observable.from (new String [] {"Jeg elsker", "RxJava"})). Abonner (testSubscriber); testSubscriber.assertValues ​​("Hei", "Verden", "Jeg elsker", "RxJava"); }

3.2. MergeDelayError

De mergeDelayError metoden er den samme som slå sammen ved at den kombinerer flere Observerbare inn i ett, men hvis det oppstår feil under sammenslåingen, lar det feilfrie varer fortsette før feilene overføres:

@Test offentlig ugyldighet gittMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.mergeDelayError (Observable.from (new String [] {"hallo", "world"}), Observable.error (new RuntimeException ("Some exception")), Observable.from (new String [] {"rxjava"} )) .abscribe (testSubscriber); testSubscriber.assertValues ​​("hei", "verden", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

Ovennevnte eksempel avgir alle feilfrie verdier:

hei verden rxjava

Merk at hvis vi bruker slå sammen i stedet for mergeDelayError, den Stringrxjava ” vil ikke bli sendt ut fordi slå sammen stopper umiddelbart dataflyten fra Observerbare når det oppstår en feil.

3.3. Glidelås

De glidelås utvidelsesmetode samler to verdisekvenser som par:

@Test offentlig ugyldig givenTwoObservables_whenZipped_thenReturnCombinedResults () {List zippedStrings = ny ArrayList (); Observable.zip (Observable.from (new String [] {"Simple", "Moderate", "Complex"}), Observable.from (new String [] {"Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + "" + str2). abonner (zippedStrings :: add); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). erEqualTo (3); assertThat (zippedStrings) .contains ("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Glidelås med intervall

I dette eksemplet vil vi zip en stream med intervall som faktisk vil forsinke utslipp av elementer i den første strømmen:

@Test offentlig ugyldighet gittAStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = new TestSubscriber (); Observable data = Observable.just ("one", "two", "three", "four", "five"); Observable interval = Observable.interval (1L, TimeUnit.SECONDS); Observerbar .zip (data, intervall, (strData, tick) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Subscribe (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = en", "[1] = to", "[2] = tre", "[3] = fire", "[4] = fem"); }

4. Oppsummering

I denne artikkelen har vi sett noen av metodene for å kombinere Observerbare med RxJava. Du kan lære om andre metoder som combineLatest, bli med, gruppe Bli med, switchOnNext, i den offisielle RxJava-dokumentasjonen.

Som alltid er kildekoden for denne artikkelen tilgjengelig i GitHub-repoen.


$config[zx-auto] not found$config[zx-overlay] not found