Introduksjon til RSocket

1. Introduksjon

I denne opplæringen tar vi en første titt på RSocket og hvordan det muliggjør klient-server-kommunikasjon.

2. Hva er? RSocket?

RSocket er en binær, punkt-til-punkt kommunikasjonsprotokoll beregnet for bruk i distribuerte applikasjoner. Sånn sett gir det et alternativ til andre protokoller som HTTP.

En full sammenligning mellom RSocket og andre protokoller er utenfor omfanget av denne artikkelen. I stedet vil vi fokusere på en nøkkelfunksjon i RSocket: dens interaksjonsmodeller.

RSocket tilbyr fire interaksjonsmodeller. Med det i tankene vil vi utforske hver enkelt med et eksempel.

3. Maven-avhengigheter

RSocket trenger bare to direkte avhengigheter for eksemplene våre:

 io.rsocket rsocket-core 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

Avhengighetene rsocket-core og rsocket-transport-netty er tilgjengelige på Maven Central.

Et viktig notat er at RSocket-biblioteket ofte bruker reaktive strømmer. De Flux og Mono klasser brukes gjennom hele denne artikkelen, så en grunnleggende forståelse av dem vil være nyttig.

4. Serveroppsett

La oss først lage Server klasse:

offentlig klasse Server {privat slutt Engangsserver; public Server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (new RSocketImpl ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start () .abonnere(); } offentlig tomrom disponere () {this.server.dispose (); } privat klasse RSocketImpl utvider AbstractRSocket {}}

Her bruker vi RSocketFactory for å sette opp og lytte til en TCP-kontakt. Vi passerer i vår skikk RSocketImpl å håndtere forespørsler fra kunder. Vi legger til metoder i RSocketImpl mens vi går.

Deretter, for å starte serveren, trenger vi bare å instantiere den:

Server server = ny server ();

En enkelt serverforekomst kan håndtere flere tilkoblinger. Som et resultat vil bare en serverforekomst støtte alle eksemplene våre.

Når vi er ferdige, vil kast metoden stopper serveren og frigjør TCP-porten.

4. Interaksjonsmodeller

4.1. Forespørsel / svar

RSocket gir en forespørsels- / svarmodell - hver forespørsel mottar et enkelt svar.

For denne modellen lager vi en enkel tjeneste som returnerer en melding tilbake til klienten.

La oss starte med å legge til en metode i utvidelsen vår av Sammendrag RSocketImpl:

@ Override offentlig Mono requestResponse (nyttelast) {prøv {return Mono.just (nyttelast); // reflekter nyttelasten tilbake til avsenderen} fangst (Unntak x) {retur Mono.error (x); }}

De requestResponse metoden returnerer et enkelt resultat for hver forespørsel, som vi kan se av Mono svarstype.

Nyttelast er klassen som inneholder meldingsinnhold og metadata. Den brukes av alle interaksjonsmodellene. Innholdet i nyttelasten er binært, men det er praktiske metoder som støtter String-basert innhold.

Deretter kan vi opprette klientklassen vår:

offentlig klasse ReqResClient {privat endelig RSocket-kontakt; public ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (String string) {return socket .requestResponse (DefaultPayload.create (string)) .map (Payload :: getDataUtf8) .block (); } offentlig tomrom disponere () {this.socket.dispose (); }}

Klienten bruker RSocketFactory.connect () metode for å starte en stikkontaktforbindelse med serveren. Vi bruker requestResponse metode på kontakten for å sende en nyttelast til serveren.

Nyttelasten vår inneholder String overført til klienten. Når Mono svaret kommer vi kan bruke getDataUtf8 () metoden for å få tilgang til String innholdet i svaret.

Til slutt kan vi kjøre integrasjonstesten for å se forespørsel / svar i aksjon. Vi sender en String til serveren og verifiser at det samme String returneres:

@Test offentlig ugyldig nårSendingAString_thenRevceiveTheSameString () {ReqResClient client = new ReqResClient (); Strengstreng = "Hello RSocket"; assertEquals (string, client.callBlocking (string)); client.dispose (); }

4.2. Brann-og-glem

Med brann-og-glem-modellen, klienten vil ikke motta noe svar fra serveren.

I dette eksemplet vil klienten sende simulerte målinger til serveren i 50ms intervaller. Serveren vil publisere målingene.

La oss legge til en fyr-og-glem-behandler til serveren vår i RSocketImpl klasse:

@ Override offentlig Mono fireAndForget (nyttelast) {prøv {dataPublisher.publish (nyttelast); // videresende nyttelastretur Mono.empty (); } fange (Unntak x) {retur Mono.error (x); }}

Denne behandleren ser veldig ut som forespørselen / svarbehandleren. Derimot, fireAndForget returnerer Mono i stedet for Mono.

De dataPublisher er en forekomst av org.reactivestreams.Publisher. Dermed gjør den nyttelasten tilgjengelig for abonnenter. Vi bruker det i forespørselen / strømeksemplet.

Deretter oppretter vi brann-og-glem-klienten:

offentlig klasse FireNForgetClient {privat endelig RSocket-kontakt; privat sluttdata; offentlig FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** Send binærhastighet (flyt) hver 50ms * / offentlig ugyldig sendData () {data = Collections.unmodifiableList (genererData ()); Flux.interval (Duration.ofMillis (50)) .take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

Stikkontakten er nøyaktig den samme som før.

De sendData () metoden bruker en Flux stream for å sende flere meldinger. For hver melding påkaller vi oss stikkontakt :: fireAndForget.

Vi må abonnere på Mono svar for hver melding. Hvis vi glemmer å abonnere da stikkontakt :: fireAndForget vil ikke utføre.

De flatMap operatøren sørger for at Tomrom svar sendes til abonnenten, mens blockLast operatør fungerer som abonnent.

Vi kommer til å vente til neste avsnitt for å kjøre brann-og-glem-testen. På det tidspunktet oppretter vi en forespørsel / stream-klient for å motta dataene som ble presset av brann-og-glem-klienten.

4.3. Forespørsel / Stream

I forespørselen / stream-modellen, en enkelt forespørsel kan motta flere svar. For å se dette i aksjon kan vi bygge videre på eksemplet med ild og glem. For å gjøre det, la oss be om en strøm for å hente målingene vi sendte i forrige avsnitt.

Som før, la oss begynne med å legge til en ny lytter til RSocketImpl på serveren:

@Override public Flux requestStream (nyttelast) {return Flux.from (dataPublisher); }

De requestStream handler returnerer a Flux strøm. Som vi husker fra forrige avsnitt, er fireAndForget behandler publiserte innkommende data til dataPublisher. Nå lager vi en Flux stream ved hjelp av det samme dataPublisher som hendelseskilde. Ved å gjøre dette vil måledataene strømme asynkront fra vår brann-og-glem-klient til vår forespørsel / stream-klient.

La oss opprette forespørselen / strøm klienten neste:

offentlig klasse ReqStreamClient {privat endelig RSocket-kontakt; offentlig ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } offentlig tomrom disponere () {this.socket.dispose (); }}

Vi kobler til serveren på samme måte som våre tidligere klienter.

I getDataStream ()vi bruker socket.requestStream () for å motta en Flux-strøm fra serveren. Fra den strømmen trekker vi ut Flyte verdier fra binære data. Til slutt returneres strømmen til den som ringer, slik at den som ringer kan abonnere på den og behandle resultatene.

La oss nå teste. Vi bekrefter tur-retur fra fire-and-glem å be / streame.

Vi kan hevde at hver verdi mottas i samme rekkefølge som den ble sendt. Deretter kan vi hevde at vi mottar samme antall verdier som ble sendt:

@Test offentlig ugyldig nårSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = new FireNForgetClient (); ReqStreamClient streamClient = ny ReqStreamClient (); Listedata = fnfClient.getData (); List dataReceived = new ArrayList (); Engangsabonnement = streamClient.getDataStream () .index (). Abonnere (tuple -> {assertEquals ("Feil verdi", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); data mottatt. legg til (tuple.getT2 ());}, feil -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... disponere klient- og abonnementsassertEquals ("Feil datatall mottatt", data.size (), dataReceived.size ()); }

4.4. Kanal

Kanalmodellen gir toveiskommunikasjon. I denne modellen flyter meldingsstrømmer asynkront i begge retninger.

La oss lage en enkel spillsimulering for å teste dette. I dette spillet blir hver side av kanalen en spiller. Når spillet går, vil disse spillerne sende meldinger til den andre siden med tilfeldige tidsintervaller. Den motsatte siden vil reagere på meldingene.

For det første oppretter vi behandleren på serveren. Som før, legger vi til RSocketImpl:

@ Override public Flux requestChannel (Publisher payloads) {Flux.from (payloads) .subscribe (gameController :: processPayload); returner Flux.from (gameController); }

De requestChannel handler har Nyttelast strømmer for både inngang og utgang. De Forlegger inngangsparameter er en strøm av nyttelaster mottatt fra klienten. Når de ankommer, overføres disse nyttelastene til gameController :: processPayload funksjon.

Som svar returnerer vi en annen Flux strøm tilbake til klienten. Denne strømmen er opprettet fra vår gameController, som også er en Forlegger.

Her er et sammendrag av GameController klasse:

offentlig klasse GameController implementerer Publisher {@Override public void subscribe (Abonnentabonnent) {// sender nyttelastmeldinger til abonnenten med tilfeldige intervaller} offentlig ugyldig prosessPayload (nyttelast) {// reagerer på meldinger fra den andre spilleren}}

Når GameController mottar en abonnent, begynner den å sende meldinger til den abonnenten.

La oss deretter opprette klienten:

offentlig klasse ChannelClient {privat endelig RSocket-kontakt; privat finale GameController gameController; offentlig ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = ny GameController ("Client Player"); } offentlig ugyldig playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } offentlig tomrom disponere () {this.socket.dispose (); }}

Som vi har sett i våre tidligere eksempler, kobles klienten til serveren på samme måte som de andre klientene.

Klienten oppretter sin egen forekomst av GameController.

Vi bruker socket.requestChannel () å sende vår Nyttelast strøm til serveren. Serveren svarer med en egen Payload-strøm.

Som nyttelast mottatt fra serveren, sender vi dem til vårt gameController :: processPayload behandler.

I vår spillsimulering er klienten og serveren speilbilder av hverandre. Det er, hver side sender en strøm av Nyttelast og motta en strøm av Nyttelast fra den andre enden.

Strømmene kjører uavhengig, uten synkronisering.

Til slutt, la oss kjøre simuleringen i en test:

@Test offentlig ugyldig når RunChannelGame_thenLogTheResults () {ChannelClient client = new ChannelClient (); client.playGame (); client.dispose (); }

5. Konklusjon

I denne innledende artikkelen har vi utforsket samhandlingsmodellene som tilbys av RSocket. Den fulle kildekoden til eksemplene finner du i Github-depotet vårt.

Husk å sjekke ut RSocket-nettstedet for en dypere diskusjon. Spesielt gir FAQ og motivasjonsdokumenter en god bakgrunn.


$config[zx-auto] not found$config[zx-overlay] not found