Håndterer mottrykk med RxJava

1. Oversikt

I denne artikkelen vil vi se på hvordan RxJava-biblioteket hjelper oss med å håndtere mottrykk.

Enkelt sagt - RxJava bruker et konsept med reaktive strømmer ved å introdusere Observerbare, som en eller flere Observatører kan abonnere på. Å takle mulig uendelige strømmer er veldig utfordrende, ettersom vi må møte et mottrykkproblem.

Det er ikke vanskelig å komme i en situasjon der en Observerbar sender ut varer raskere enn en abonnent kan konsumere dem. Vi vil se på de forskjellige løsningene på problemet med økende buffer for uforbrukte varer.

2. Varmt Observerbare Versus Cold Observerbare

La oss først lage en enkel forbrukerfunksjon som skal brukes som forbruker av elementer fra Observerbare som vi vil definere senere:

offentlig klasse ComputeFunction {public static void compute (Integer v) {try {System.out.println ("compute integer v:" + v); Tråd. Søvn (1000); } fange (InterruptedException e) {e.printStackTrace (); }}}

Våre beregne () funksjonen er ganske enkelt å skrive ut argumentet. Det viktige å legge merke til her er en påkallelse av en Thread.sleep (1000) metode - vi gjør det for å etterligne en langvarig oppgave som vil føre til Observerbar å fylle opp med gjenstander raskere det Observatør kan konsumere dem.

Vi har to typer Observables - Hot og Kald - som er helt forskjellige når det gjelder mottrykkhåndtering.

2.1. Kald Observerbare

En forkjølelse Observerbar avgir en bestemt sekvens med elementer, men kan begynne å sende ut denne sekvensen når den er Observatør synes det er praktisk, og uansett pris Observatør ønsker, uten å forstyrre sekvensens integritet. Kald Observerbar gir ting på en lat måte.

De Observatør tar bare elementer når den er klar til å behandle det elementet, og elementene ikke trenger å bli bufret i et Observerbar fordi de blir bedt om på en pull-måte.

For eksempel hvis du oppretter en Observerbar basert på et statisk utvalg av elementer fra en til en million, det Observerbar vil avgi samme sekvens av ting uansett hvor ofte disse elementene blir observert:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()). Abonner (ComputeFunction :: compute);

Når vi starter programmet vårt, blir elementene beregnet av Observatør lat og vil bli bedt om på en pull mote. De Schedulers.computation () metode betyr at vi ønsker å kjøre vår Observatør innenfor en beregningstråd pool i RxJava.

Resultatet av et program vil bestå av et resultat av en beregne () metode påkalt for ett og ett element fra en Observerbar:

beregne heltall v: 1 beregne heltall v: 2 beregne heltall v: 3 beregne heltall v: 4 ...

Kald Observerbare trenger ikke å ha noen form for et mottrykk fordi de fungerer på en trekk måte. Eksempler på gjenstander som er forkjølt Observerbar kan inkludere resultatene av et databasespørsmål, filhenting eller nettforespørsel.

2.2. Varmt Observerbare

En varm Observerbar begynner å generere gjenstander og sender dem ut umiddelbart når de er opprettet. Det er i strid med forkjølelse Observerbare trekkmodell for behandling. Varmt Observerbar sender ut gjenstander i sitt eget tempo, og det er opp til observatørene å følge med.

Når Observatør er ikke i stand til å konsumere varer så raskt som de produseres av en Observerbar de må bufres eller håndteres på en annen måte, da de vil fylle opp minnet og til slutt forårsake OutOfMemoryException.

La oss vurdere et eksempel på varmt Observerbar, som produserer 1 million varer til en sluttforbruker som behandler disse artiklene. Når en beregne () metoden i Observatør tar litt tid å behandle hvert element, Observerbar begynner å fylle opp et minne med elementer, noe som får et program til å mislykkes:

PublishSubject source = PublishSubject.create (); source.observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (kilde :: onNext); 

Å kjøre programmet mislykkes med en Mangler BackpressureException fordi vi ikke definerte en måte å håndtere overproduksjon på Observerbar.

Eksempler på gjenstander som sendes ut av en hot Observerbar kan omfatte mus- og tastaturhendelser, systemhendelser eller aksjekurser.

3. Buffering av overproduksjon Observerbar

Den første måten å håndtere overproduksjon på Observerbar er å definere en slags buffer for elementer som ikke kan behandles av en Observatør.

Vi kan gjøre det ved å ringe en buffer() metode:

PublishSubject source = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); 

Å definere en buffer med størrelsen 1024 vil gi en Observatør litt tid til å ta igjen en overproduserende kilde. Bufferen lagrer gjenstander som ennå ikke ble behandlet.

Vi kan øke bufferstørrelsen for å ha nok plass til produserte verdier.

Merk imidlertid at generelt, dette kan bare være en midlertidig løsning ettersom overløp fortsatt kan skje hvis kilden overproducerer den forventede bufferstørrelsen.

4. Batching av sendte gjenstander

Vi kan batchprodusere varer i vinduer med N-elementer.

Når Observerbar produserer elementer raskere enn Observatør kan behandle dem, kan vi lindre dette ved å gruppere produserte elementer sammen og sende en gruppe elementer til Observatør som er i stand til å behandle en samling av elementer i stedet for element en etter en:

PublishSubject source = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); 

Ved hjelp av vindu() metode med argument 500, Vil fortelle Observerbar for å gruppere elementer i 500-store batcher. Denne teknikken kan redusere problemet med overproduksjon Observerbar når Observatør er i stand til å behandle en gruppe elementer raskere sammenlignet med behandling av elementer en etter en.

5. Hopp over elementer

Hvis noen av verdiene produsert av Observerbar kan ignoreres trygt, kan vi bruke prøvetakingen innen en bestemt tids- og strupingoperatører.

Metodene prøve() og throttleFirst () tar varighet som parameter:

  • Srikelig () metoden ser jevnlig på rekkefølgen av elementer og sender ut det siste elementet som ble produsert innen varigheten som er angitt som parameter
  • De throttleFirst () metoden sender ut det første elementet som ble produsert etter varigheten som er angitt som parameter

Varigheten er en tid etter hvilken ett bestemt element er plukket fra sekvensen av produserte elementer. Vi kan spesifisere en strategi for håndtering av mottrykk ved å hoppe over elementer:

PublishSubject source = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace);

Vi spesifiserte at strategien for å hoppe over elementer vil være en prøve() metode. Vi ønsker et utvalg av en sekvens fra 100 millisekunder. Dette elementet vil bli sendt ut til Observatør.

Husk imidlertid at disse operatørene bare reduserer frekvensen for mottak av verdier i nedstrøms Observatør og dermed kan de fremdeles føre til Mangler BackpressureException.

6. Håndtering av fylling Observerbar Buffer

I tilfelle at våre strategier for prøvetaking eller batching av elementer ikke hjelper med å fylle opp en buffer, vi trenger å implementere en strategi for håndtering av saker når en buffer fylles opp.

Vi må bruke en onBackpressureBuffer () metode for å forhindre BufferOverflowException.

De onBackpressureBuffer () metoden tar tre argumenter: kapasiteten til en Observerbar buffer, en metode som påkalles når en buffer fylles opp, og en strategi for å håndtere elementer som må kastes fra en buffer. Strategier for overløp er i a Mottrykk Overløp klasse.

Det er fire typer handlinger som kan utføres når bufferen fylles opp:

  • ON_OVERFLOW_ERROR - dette er standard atferd som signaliserer a BufferOverflowException når bufferen er full
  • ON_OVERFLOW_DEFAULT - for øyeblikket er det det samme som ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - hvis et overløp ville skje, vil den nåværende verdien bare bli ignorert, og bare de gamle verdiene vil bli levert en gang nedstrøms Observatør forespørsler
  • ON_OVERFLOW_DROP_OLDEST - slipper det eldste elementet i bufferen og legger til den nåværende verdien

La oss se hvordan vi spesifiserer strategien:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()). Abonner (e -> {}, Kastbar :: printStackTrace) 

Her er vår strategi for å håndtere den overfylte bufferen å slippe det eldste elementet i en buffer og legge til det nyeste elementet produsert av en Observerbar.

Merk at de to siste strategiene forårsaker diskontinuitet i strømmen når de faller ut av elementer. I tillegg signaliserer de ikke BufferOverflowException.

7. Slippe alle overproduserte elementer

Hver gang nedstrøms Observatør ikke er klar til å motta et element, kan vi bruke et onBackpressureDrop () metode for å slippe elementet fra sekvensen.

Vi kan tenke på den metoden som en onBackpressureBuffer () metode med en bufferkapasitet satt til null med en strategi ON_OVERFLOW_DROP_LATEST.

Denne operatøren er nyttig når vi trygt kan ignorere verdier fra en kilde Observerbar (for eksempel musebevegelser eller nåværende GPS-posisjonssignaler) ettersom det vil være mer oppdaterte verdier senere:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute). Abonner (v -> {}, Throwable :: printStackTrace);

Metoden onBackpressureDrop () eliminerer et problem med overproduksjon Observerbar men må brukes med forsiktighet.

8. Konklusjon

I denne artikkelen så vi på et problem med overproduksjon Observerbar og måter å håndtere et mottrykk på. Vi så på strategier for buffering, batching og hopping av elementer når Observatør er ikke i stand til å konsumere elementer så raskt som de produseres av en Observerbar.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found