Forskjellen mellom RxJava API og Java 9 Flow API

1. Introduksjon

Java Flow API ble introdusert i Java 9 som en implementering av Reactive Stream Specification.

I denne opplæringen vil vi først undersøke reaktive strømmer. Deretter lærer vi om forholdet til RxJava og Flow API.

2. Hva er reaktive strømmer?

Reactive Manifesto introduserte Reactive Streams for å spesifisere en standard for asynkron strømbehandling med ikke-blokkerende mottrykk.

Omfanget av spesifikasjonen for reaktiv strøm er å definere et minimalt sett med grensesnitt for å oppnå disse målene:

  • org.reactivestreams.Publisher er en dataleverandør som publiserer data til abonnentene basert på deres etterspørsel

  • org.reactivestreams.Subscriber er forbruker av data - den kan motta data etter abonnement på en utgiver

  • org.reactivestreams. abonnement opprettes når en forlegger godtar en abonnent

  • org.reactivestreams.Prosessor er både abonnent og forlegger - abonnerer på forlegger, behandler dataene og overfører deretter behandlede data til abonnenten

Flow API kommer fra spesifikasjonen. RxJava går foran den, men siden 2.0 har RxJava også støttet spesifikasjonen.

Vi går dypt inn i begge deler, men først, la oss se en praktisk brukssak.

3. Bruk sak

For denne opplæringen bruker vi en live stream-videotjeneste som vårt brukstilfelle.

En live stream-video, i motsetning til on-demand videostreaming, er ikke avhengig av forbrukeren. Derfor publiserer serveren strømmen i sitt eget tempo, og det er forbrukerens ansvar å tilpasse seg.

I den mest enkle formen består modellen vår av en videostrømforlegger og en videospiller som abonnent.

La oss implementere VideoFrame som vårt dataelement:

offentlig klasse VideoFrame {privat langt nummer; // tilleggsdatafelt // constructor, getters, setters}

Så la oss gå gjennom Flow API og RxJava-implementeringene en etter en.

4. Implementering med Flow API

Flow API-ene i JDK 9 tilsvarer spesifikasjonen for reaktive strømmer. Hvis applikasjonen i utgangspunktet ber om N-elementer med Flow API, skyver utgiveren maksimalt N-elementer til abonnenten.

Flow API-grensesnittene er alle i java.util.concurrent.Flow grensesnitt. De tilsvarer semantisk deres respektive kolleger fra Reactive Streams.

La oss implementere VideoStreamServer som utgiver av VideoFrame.

offentlig klasse VideoStreamServer utvider SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

Vi har utvidet vår VideoStreamServer fra Innlevering Forlag i stedet for å implementere direkte Flow :: Utgiver. Innlevering Forlag er JDK implementering av Flow :: Utgiver for asynkron kommunikasjon med abonnenter, så det lar vår VideoStreamServer å avgi i sitt eget tempo.

Det er også nyttig for mottrykk og bufferhåndtering, for når SubmissionPublisher :: abonner kalt, det skaper en forekomst av BufferedAbonnement, og legger deretter til det nye abonnementet i abonnementskjeden. BufferedAbonnement kan buffer utstedte varer opp til SubmissionPublisher # maxBufferCapacity.

La oss nå definere Videospiller, som forbruker en strøm av VideoFrame. Derfor må den implementeres Strøm :: Abonnent.

offentlig klasse VideoPlayer implementerer Flow.Subscriber {Flow.Subscription abonnement = null; @Override public void onSubscribe (Flow.Subscription abonnement) {this.subscription = abonnement; abonnement. forespørsel (1); } @Override public void onNext (VideoFrame item) {log.info ("play # {}", item.getNumber ()); abonnement. forespørsel (1); } @ Override public void onError (Throwable throwable) {log.error ("Det er en feil i videostreaming: {}", throwable.getMessage ()); } @ Override public void onComplete () {log.error ("Videoen er avsluttet"); }}

Videospiller abonnerer på VideoStreamServer, deretter etter et vellykket abonnement Videospiller::onAbonner metoden kalles, og den ber om en ramme. Videospiller:: onNext motta rammen og forespørsler om en ny. Antallet av de forespurte rammene avhenger av brukssaken og Abonnent implementeringer.

Til slutt, la oss sette ting sammen:

VideoStreamServer streamServer = ny VideoStreamServer (); streamServer.subscribe (ny VideoPlayer ()); // send inn videorammer ScheduledExecutorService executor = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = ny AtomicLong (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (new VideoFrame (frameNumber.getAndIncrement ()), (subscriber, videoFrame) -> {subscriber.onError (new RuntimeException ("Frame #" + videoFrame.getNumber () + " droppet på grunn av mottrykk ")); return true;});}, 0, 1, TimeUnit.MILLISECONDS); søvn (1000);

5. Implementering med RxJava

RxJava er en Java-implementering av ReactiveX. ReactiveX (eller Reactive Extensions) -prosjektet har som mål å gi et reaktivt programmeringskonsept. Det er en kombinasjon av Observer-mønsteret, Iterator-mønsteret og funksjonell programmering.

Den siste store versjonen for RxJava er 3.x. RxJava støtter Reactive Streams siden versjon 2.x med sin Flytbar baseklasse, men det er et mer betydelig sett enn Reactive Streams med flere baseklasser som Flytbar, Observerbar, Enkelt, Fullførbar.

Flytbar som reaktiv strømoverholdelseskomponent er en strøm på 0 til N elementer med mottrykkhåndtering. Flytbar strekker Forlegger fra Reactive Streams. Derfor godtar mange RxJava-operatører Forlegger direkte og tillate direkte interoperasjon med andre implementeringer av Reactive Streams.

La oss lage videostrømgeneratoren vår som er en uendelig lat strøm:

Stream videoStream = Stream.iterate (new VideoFrame (0), videoFrame -> {// sleep for 1ms; return new VideoFrame (videoFrame.getNumber () + 1);});

Deretter definerer vi en Flytbar forekomst for å generere rammer på en egen tråd:

Flowable .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Det er viktig å merke seg at en uendelig strøm er nok for oss, men hvis vi trenger en mer fleksibel måte å generere strømmen vår på, Flytbar. Lag er et godt valg.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (new VideoFrame (frame.incrementAndGet ())); / / sov i 1 ms for å simulere forsinkelse}}}, / * Sett mottrykkstrategi her * /)

Så, på neste trinn, abonnerer VideoPlayer på dette Flowable og observerer elementer på en egen tråd.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())). abonner (element -> {log.info ("play #" + item.getNumber ()); // sov i 30 ms for å simulere rammevisning}) ;

Og til slutt konfigurerer vi strategien for mottrykk. Hvis vi vil stoppe videoen i tilfelle tap av bilder, må vi bruke den MottrykkOverflowStrategy :: FEIL når bufferen er full.

Flowable .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors.newS.) > {log.info ("play #" + item.getNumber ()); // sov i 30 ms for å simulere rammevisning});

6. Sammenligning av RxJava og Flow API

Selv i disse to enkle implementeringene kan vi se hvordan RxJavas API er rik, spesielt for bufferhåndtering, feilhåndtering og mottrykkstrategi. Det gir oss flere alternativer og færre kodelinjer med sitt flytende API. La oss nå vurdere mer kompliserte saker.

Anta at spilleren vår ikke kan vise videorammer uten kodek. Derfor må vi implementere en med Flow API Prosessor for å simulere kodeken og sitte mellom server og spiller. Med RxJava kan vi gjøre det med Flytbar :: flatMap eller Flytbar :: kart.

Eller la oss forestille oss at spilleren vår også skal sende live oversettelseslyd, så vi må kombinere strømmer av video og lyd fra separate utgivere. Med RxJava kan vi bruke Flytbar :: combineLatest, men med Flow API er det ikke en enkel oppgave.

Selv om det er mulig å skrive en tilpasset Prosessor som abonnerer på begge strømmer og sender de kombinerte dataene til våre Videospiller. Implementeringen er imidlertid hodepine.

7. Hvorfor Flow API?

På dette punktet kan vi ha et spørsmål, hva er filosofien bak Flow API?

Hvis vi søker etter bruk av Flow API i JDK, kan vi finne noe i java.net.http og jdk.internal.net.http.

Videre kan vi finne adaptere i reaktorprosjektet eller reaktiv strømpakke. For eksempel, org.reactivestreams.FlowAdapters har metoder for å konvertere Flow API-grensesnitt til Reactive Stream-ene og omvendt. Derfor hjelper det interoperabiliteten mellom Flow API og biblioteker med støtte for reaktiv strøm.

Alle disse fakta hjelper oss til å forstå formålet med Flow API: Det ble opprettet for å være en gruppe reaktive spesifikasjonsgrensesnitt i JDK uten relé til tredjeparter. Videre forventer Java at Flow API blir akseptert som standardgrensesnitt for reaktiv spesifikasjon og skal brukes i JDK eller andre Java-baserte biblioteker som implementerer den reaktive spesifikasjonen for mellomvare og verktøy.

8. Konklusjoner

I denne opplæringen har vi en introduksjon til Reactive Stream Specification, Flow API og RxJava.

Videre har vi sett et praktisk eksempel på Flow API og RxJava-implementeringer for en live videostrøm.

Men alle aspekter av Flow API og RxJava liker Flow :: Prosessor, Flytbar :: kart og Flytbar :: flatMap eller mottrykkstrategier er ikke dekket her.

Som alltid finner du opplæringen fullstendig kode på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found