Planleggere i RxJava

1. Oversikt

I denne artikkelen skal vi fokusere på forskjellige typer Planleggere som vi skal bruke til å skrive flertrådeprogrammer basert på RxJava Observable abonnerer På og observer PÅ metoder.

Planleggere gi muligheten til å spesifisere hvor og sannsynlig når du skal utføre oppgaver knyttet til driften av et Observerbar kjede.

Vi kan få en Planlegger fra fabrikkmetodene beskrevet i klassen Planleggere.

2. Standard gjengeatferd

Som standard,Rx har en tråd som innebærer at en Observerbar og kjeden av operatører som vi kan bruke på den, vil varsle observatørene på samme tråd som den abonnere() metoden kalles.

De observer PÅ og abonner På metoder tar som argument a Planlegger, som, som navnet antyder, er et verktøy vi kan bruke til å planlegge individuelle handlinger.

Vi lager implementeringen av en Planlegger ved å bruke skapeArbeider metode, som returnerer a Planlegger. Arbeider. EN arbeider aksepterer handlinger og utfører dem sekvensielt på en enkelt tråd.

På en måte, a arbeider er en Scheduler selv, men vi vil ikke referere til det som en Planlegger for å unngå forvirring.

2.1. Planlegge en handling

Vi kan planlegge en jobb på hvilken som helst Planlegger ved å lage en ny arbeider og planlegge noen handlinger:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> result + = "action"); Assert.assertTrue (result.equals ("action"));

Handlingen blir deretter satt i kø på tråden som arbeideren er tildelt.

2.2. Avbryte en handling

Planlegger. Arbeider strekker Abonnement. Ringer til Avslutte abonnementet metode på en arbeider vil føre til at køen tømmes og alle ventende oppgaver blir kansellert. Vi kan se det ved eksempel:

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> result + = "Second_Action"); Assert.assertTrue (result.equals ("First_Action"));

Den andre oppgaven blir aldri utført fordi den før den avbrøt hele operasjonen. Handlinger som var i ferd med å bli utført vil bli avbrutt.

3. Planleggere. Ny tråd

Denne planleggeren starter ganske enkelt en ny tråd hver gang den blir bedt om via subscribeOn () eller observer på ().

Det er nesten aldri et godt valg, ikke bare på grunn av forsinkelsen når du starter en tråd, men også fordi denne tråden ikke blir gjenbrukt:

Observable.just ("Hello") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()). Abonnement (s - > result1 + = Thread.currentThread (). getName ()); Tråd. Søvn (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2"));

Når Arbeider er ferdig, avsluttes tråden ganske enkelt. Dette Planlegger kan bare brukes når oppgavene er grovkornede: det tar mye tid å fullføre, men det er veldig få av dem, slik at tråder neppe vil bli gjenbrukt i det hele tatt.

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Tråd. Søvn (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

Da vi planla arbeider på en Ny trådTidsplanlegger, vi så at arbeideren var bundet til en bestemt tråd.

4. Planleggere. Umiddelbar

Planleggere. Umiddelbar er en spesiell planlegger som påkaller en oppgave i klienttråden på en blokkerende måte, i stedet for asynkront, og returnerer når handlingen er fullført:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Tråd. Søvn (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

Faktisk abonnerer du på en Observerbar via umiddelbar planlegger har vanligvis samme effekt som å ikke abonnere på noe spesielt Scheduler i det hele tatt:

Observable.just ("Hello") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> result + = Thread.currentThread (). GetName ()); Tråd. Søvn (500); Assert.assertTrue (result.equals ("main"));

5. Planleggere. Trampoline

De trampolinePlanlegger er veldig lik umiddelbar fordi det også planlegger oppgaver i samme tråd, effektivt blokkerer.

Den kommende oppgaven utføres imidlertid når alle tidligere planlagte oppgaver er fullført:

Observable.just (2, 4, 6, 8) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> result + = "" + i); Observable.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> result + = "" + i); Tråd. Søvn (500); Assert.assertTrue (result.equals ("246813579"));

Umiddelbar påkaller en gitt oppgave med en gang, mens trampoline venter på at den nåværende oppgaven skal være ferdig.

De trampoline‘S arbeider utfører hver oppgave på tråden som planla den første oppgaven. Den første samtalen til rute blokkerer til køen tømmes:

Scheduler scheduler = Schedulers.trampoline (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> resultat + = "_arbeider_"); resultat + = "_middleEnd";}); resultat + = "_mainEnd";}); Tråd. Søvn (500); Assert.assertTrue (result .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Planleggere. Fra

Planleggere er internt mer komplekse enn Utførere fra java.util.concurrent - så en egen abstraksjon var nødvendig.

Men fordi de konseptuelt er ganske like, er det ikke overraskende at det er en innpakning som kan snu Leder inn i Planlegger bruker fra fabrikk metode:

private ThreadFactory threadFactory (strengmønster) {returner ny ThreadFactoryBuilder () .setNameFormat (mønster) .build (); } @Test offentlig ugyldig gittExecutors_whenSchedulerFrom_thenReturnElements () kaster InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); Scheduler schedulerA = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Scheduler schedulerB = Schedulers.from (poolB); Observable observerable = Observable.create (subscriber -> {subscriber.onNext ("Alfa"); subscriber.onNext ("Beta"); subscriber.onCompleted ();}) ;; observerbar .subscribeOn (schedulerA) .subscribeOn (schedulerB) .subscribe (x -> result + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> result + = "_Completed "); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

Planlegger B. brukes i en kort periode, men den planlegger knapt en ny handling på planlegger A., som gjør alt arbeidet. Dermed flere subscribeOn-metoder blir ikke bare ignorert, men introduserer også en liten overhead.

7. Schedulers.io

Dette Planlegger ligner på ny tråd bortsett fra det faktum at allerede startede tråder resirkuleres og muligens kan håndtere fremtidige forespørsler.

Denne implementeringen fungerer på samme måte som ThreadPoolExecutor fra java.util.concurrent med et ubegrenset basseng av tråder. Hver gang en ny arbeider blir bedt om, enten startes en ny tråd (og senere holdes inaktiv i noen tid) eller den inaktive brukes på nytt:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

Vi må være forsiktige med ubegrensede ressurser av noe slag - i tilfelle langsomme eller ikke-responderende eksterne avhengigheter som nettjenester, ioplanlegger kan starte et enormt antall tråder, noe som fører til at vår egen applikasjon ikke svarer.

I praksis følgende Schedulers.io er nesten alltid et bedre valg.

8. Planleggere. Beregning

Beregning Scheduler begrenser som standard antall tråder som kjører parallelt med verdien av tilgjengeligProsessorer (), som funnet i Runtime.getRuntime () bruksklasse.

Så vi bør bruke en beregningsplanlegger når oppgaver er helt CPU-bundet; det vil si at de krever beregningskraft og ikke har noen blokkeringskode.

Den bruker en ubegrenset kø foran hver tråd, så hvis oppgaven er planlagt, men alle kjerner er opptatt, vil den stå i kø. Køen like før hver tråd vil imidlertid fortsette å vokse:

Observable.just ("computation") .subscribeOn (Schedulers.computation ()) .subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

Hvis vi av en eller annen grunn trenger et annet antall tråder enn standard, kan vi alltid bruke rx.scheduler.max-beregningstråder systemegenskap.

Ved å ta færre tråder kan vi sikre at det alltid er en eller flere CPU-kjerner inaktiv, og til og med under tung belastning, beregning thread pool metter ikke serveren. Det er rett og slett ikke mulig å ha flere beregningstråder enn kjerner.

9. Schedulers.test

Dette Planlegger brukes bare til testformål, og vi ser det aldri i produksjonskoden. Hovedfordelen er muligheten til å fremme klokken, og simulere tiden som vilkårlig passerer:

Listebokstaver = Arrays.asList ("A", "B", "C"); TestScheduler scheduler = Schedulers.test (); TestSubscriber-abonnent = ny TestSubscriber (); Observable tick = Observable .interval (1, TimeUnit.SECONDS, scheduler); Observable.from (letters) .zipWith (tick, (string, index) -> index + "-" + string) .subscribeOn (scheduler) .subscribe (subscriber); subscriber.assertNoValues ​​(); subscriber.assertNotCompleted (); scheduler.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); scheduler.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C"));

10. Standardplanleggere

Noen Observerbar operatører i RxJava har alternative skjemaer som lar oss angi hvilke Planlegger operatøren vil bruke til sin drift. Andre opererer ikke på noe spesielt Planlegger eller operere på en bestemt standard Planlegger.

For eksempel forsinkelse operatøren tar oppstrøms hendelser og skyver dem nedstrøms etter en gitt tid. Åpenbart kan den ikke holde den opprinnelige tråden i den perioden, så den må bruke en annen Planlegger:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Scheduler schedulerA = Schedulers.from (poolA); Observable.just ('A', 'B'). Delay (1, TimeUnit.SECONDS, schedulerA). Abonner (i -> resultat + = Thread.currentThread (). GetName () + i + ""); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

Uten å levere en skikk planlegger A., alle operatører nedenfor forsinkelse vil bruke beregningsplanlegger.

Andre viktige operatører som støtter tilpasset Planleggere er buffer, intervall, område, timer, hopp over, ta, pause, og flere andre. Hvis vi ikke gir en Planlegger til slike operatører, beregning planlegger brukes, noe som i de fleste tilfeller er en trygg standard.

11. Konklusjon

I virkelig reaktive applikasjoner, der alle langvarige operasjoner er asynkrone, veldig få tråder og dermed Planleggere trengs.

Mestringsplanleggere er avgjørende for å skrive skalerbar og sikker kode ved bruk av RxJava. Forskjellen mellom abonner På og observer PÅ er spesielt viktig under høy belastning der hver oppgave må utføres nøyaktig når vi forventer.

Sist men ikke minst, det må vi være sikre på Planleggere brukt nedstrøms kan holde tritt med lo-annonsen generert av Planleggere oppstrøms m. For mer informasjon er det denne artikkelen om mottrykk.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found