Java 9 Reactive Streams

1. Oversikt

I denne artikkelen ser vi på Java 9 Reactive Streams. Enkelt sagt, vi kan bruke Strømme klasse, som lukker de primære byggesteinene for bygging av reaktiv strømbehandlingslogikk.

Reaktive strømmer er en standard for asynkron strømbehandling med ikke-blokkerende mottrykk. Denne spesifikasjonen er definert i Reaktivt manifest, og det er forskjellige implementeringer av det, for eksempel, RxJava eller Akka-Streams.

2. Oversikt over reaktivt API

Å bygge en Strømme, kan vi bruke tre hovedabstraksjoner og komponere dem i asynkron prosesslogikk.

Hver Strømme må behandle hendelser som er publisert til den av en Publisher-forekomst; de Forlegger har en metode - abonnere().

Hvis noen av abonnentene ønsker å motta arrangementer publisert av den, må de abonnere på det gitte Forlegger.

Mottakeren av meldinger må implementere Abonnent grensesnitt. Vanligvis er dette slutten for alle Strømme behandling fordi forekomsten av den ikke sender meldinger videre.

Vi kan tenke oss om Abonnent som en Synke. Dette har fire metoder som må overstyres - onSubscribe (), onNext (), onError (), og onComplete (). Vi ser på dem i neste avsnitt.

Hvis vi vil transformere innkommende meldinger og videreføre den til den neste Abonnent, vi trenger å implementere Prosessor grensesnitt. Dette fungerer både som en Abonnent fordi den mottar meldinger, og som Forlegger fordi den behandler disse meldingene og sender dem for videre behandling.

3. Publisering og forbruk av meldinger

La oss si at vi ønsker å lage en enkel Strømme, der vi har en Forlegger publisering av meldinger, og en enkel Abonnent forbruker meldinger når de kommer - en om gangen.

La oss lage en EndSubscriber klasse. Vi må implementere Abonnent grensesnitt. Deretter overstyrer vi de nødvendige metodene.

De onSubscribe () metoden kalles før behandlingen starter. Forekomsten av Abonnement blir bestått som argumentet. Det er en klasse som brukes til å kontrollere flyten av meldinger mellom Abonnent og Forlegger:

offentlig klasse EndSubscriber implementerer abonnent {private Abonnement; public List consumedElements = new LinkedList (); @ Overstyr offentlig ugyldighet onSubscribe (Abonnementsabonnement) {this.subscription = abonnement; abonnement. forespørsel (1); }}

Vi initialiserte også en tom Liste av forbrukte elementer som vil bli brukt i testene.

Nå må vi implementere de gjenværende metodene fra Abonnent grensesnitt. Hovedmetoden her er onNext () - dette kalles når Forlegger publiserer en ny melding:

@Override public void onNext (T item) {System.out.println ("Got:" + item); abonnement. forespørsel (1); }

Vær oppmerksom på at da vi startet abonnementet i onSubscribe () metode og når vi behandlet en melding, må vi ringe be om() metoden på Abonnement for å signalisere at strømmen Abonnent er klar til å konsumere flere meldinger.

Til slutt må vi implementere onError () - som kalles når et unntak blir kastet i behandlingen, så vel som onComplete () - ringte når Forlegger er stengt:

@Override public void onError (Throwable t) {t.printStackTrace (); } @ Override public void onComplete () {System.out.println ("Done"); }

La oss skrive en test for behandlingen Strømme. Vi bruker Innlevering Forlag klasse - en konstruksjon fra java.util.concurrent - som implementerer Forlegger grensesnitt.

Vi kommer til å sende inn N elementer til Forlegger - som vår EndSubscriber vil motta:

@Test offentlig ugyldig nårSubscribeToIt_thenShouldConsumeAll () kaster InterruptedException {// gitt SubmissionPublisher publisher = new SubmissionPublisher (); EndSubscriber-abonnent = ny EndSubscriber (); publisher.subscribe (abonnent); Listeelementer = List.of ("1", "x", "2", "x", "3", "x"); // når assertThat (publisher.getNumberOfSubscribers ()). erEqualTo (1); items.forEach (forlegger :: sende); publisher.close (); // deretter venter (). atMost (1000, TimeUnit.MILLISECONDS) .tiltil (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Merk at vi kaller Lukk() metoden på forekomsten av EndSubscriber. Det vil påberope seg onComplete () tilbakeringing under på hver Abonnent av det gitte Forlegger.

Å kjøre programmet vil gi følgende utdata:

Fikk: 1 Fikk: x Fikk: 2 Fikk: x Fikk: 3 Fikk: x Ferdig

4. Transformasjon av meldinger

La oss si at vi vil bygge lignende logikk mellom a Forlegger og en Abonnent, men også bruke litt transformasjon.

Vi lager TransformProcessor klasse som implementerer Prosessor og strekker seg Innlevering Forlegger - da dette vil være begge deler Publisher og Sabonnent.

Vi vil passere i en Funksjon som vil forvandle innganger til utganger:

offentlig klasse TransformProcessor utvider SubmissionPublisher implementerer Flow.Processor {private Funksjon funksjon; privat Flow.Abonnement abonnement; offentlig TransformProcessor (funksjonsfunksjon) {super (); this.function = funksjon; } @ Override public void onSubscribe (Flow.Subscription abonnement) {this.subscription = abonnement; abonnement. forespørsel (1); } @Override public void onNext (T item) {send (function.apply (item)); abonnement. forespørsel (1); } @Override public void onError (Throwable t) {t.printStackTrace (); } @ Override public void onComplete () {close (); }}

La oss nå skriv en rask test med en prosesseringsstrøm der Forlegger publiserer String elementer.

Våre TransformProcessor vil analysere String som Heltall - som betyr at en konvertering må skje her:

@Test offentlig ugyldig nårSubscribeAndTransformElements_thenShouldConsumeAll () kaster InterruptedException {// gitt SubmissionPublisher utgiver = ny SubmissionPublisher (); TransformProcessor transformProcessor = ny TransformProcessor (Heltall :: parseInt); EndSubscriber-abonnent = ny EndSubscriber (); Listeelementer = List.of ("1", "2", "3"); List expectResult = List.of (1, 2, 3); // når publisher.subscribe (transformProcessor); transformProcessor.subscribe (abonnent); items.forEach (forlegger :: sende); publisher.close (); // deretter venter (). atMost (1000, TimeUnit.MILLISECONDS) .tiltil (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (expectedResult)); }

Merk at det å ringe Lukk() metode på basen Forlegger vil føre til onComplete () metoden på TransformProcessor å bli påberopt.

Husk at alle utgivere i behandlingskjeden må lukkes på denne måten.

5. Kontrollere etterspørsel etter meldinger ved hjelp av Abonnement

La oss si at vi bare vil konsumere det første elementet fra abonnementet, bruke litt logikk og fullføre behandlingen. Vi kan bruke be om() metode for å oppnå dette.

La oss endre vår EndSubscriber å konsumere bare N antall meldinger. Vi sender nummeret som nummeret howMuchMessagesConsume konstruktør argument:

offentlig klasse EndSubscriber implementerer abonnent {private AtomicInteger howMuchMessagesConsume; privat abonnement; public List consumedElements = new LinkedList (); public EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = new AtomicInteger (howMuchMessagesConsume); } @ Override public void onSubscribe (Abonnementsabonnement) {this.subscription = abonnement; abonnement. forespørsel (1); } @ Override public void onNext (T item) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Got:" + element); consumedElements.add (vare); hvis (howMuchMessagesConsume.get ()> 0) {abonnement.forespørsel (1); }} // ...}

Vi kan be om elementer så lenge vi vil.

La oss skrive en test der vi bare vil konsumere ett element fra det gitte Abonnement:

@Test offentlig ugyldig nårRequestForOnlyOneElement_thenShouldConsumeOne () kaster InterruptedException {// gitt SubmissionPublisher utgiver = ny SubmissionPublisher (); EndSubscriber-abonnent = ny EndSubscriber (1); publisher.subscribe (abonnent); Listeelementer = List.of ("1", "x", "2", "x", "3", "x"); Liste forventet = List.of ("1"); // når assertThat (publisher.getNumberOfSubscribers ()). erEqualTo (1); items.forEach (forlegger :: sende); publisher.close (); // deretter venter (). atMost (1000, TimeUnit.MILLISECONDS) .tiltil (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (forventet)); }

Selv om forlegger publiserer seks elementer, vår EndSubscriber vil forbruke bare ett element fordi det signaliserer behovet for å behandle bare det ene elementet.

Ved å bruke be om() metoden på Abonnement, vi kan implementere en mer sofistikert mottrykkmekanisme for å kontrollere hastigheten på meldingsforbruket.

6. Konklusjon

I denne artikkelen så vi på Java 9 Reactive Streams.

Vi så hvordan vi skulle lage en behandling Strømme bestående av en Forlegger og en Abonnent. Vi opprettet en mer kompleks prosesseringsflyt med transformasjon av elementer ved hjelp av Prosessorer.

Til slutt brukte vi Abonnement for å kontrollere etterspørselen etter elementer av Abonnent.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found