Pålitelig melding med JGroups

1. Oversikt

JGroups er et Java API for pålitelig utveksling av meldinger. Den har et enkelt grensesnitt som gir:

  • en fleksibel protokollstabel, inkludert TCP og UDP
  • fragmentering og montering av store meldinger
  • pålitelig unicast og multicast
  • feil deteksjon
  • flytkontroll

I tillegg til mange andre funksjoner.

I denne opplæringen oppretter vi en enkel applikasjon for utveksling String meldinger mellom applikasjoner og levere delt tilstand til nye applikasjoner når de blir med i nettverket.

2. Oppsett

2.1. Maven avhengighet

Vi må legge til en enkelt avhengighet til vår pom.xml:

 org.jgroups jgroups 4.0.10.Final 

Den siste versjonen av biblioteket kan sjekkes på Maven Central.

2.2. Nettverk

JGroups vil prøve å bruke IPV6 som standard. Avhengig av systemkonfigurasjonen kan dette føre til at applikasjoner ikke kan kommunisere.

For å unngå dette, stiller vi inn java.net.preferIPv4Stack til ekte egenskap når du kjører applikasjonene våre her:

java -Djava.net.preferIPv4Stack = sann com.baeldung.jgroups.JGroupsMessenger 

3. JKanaler

Vår forbindelse til et JGroups-nettverk er en JChannel. Kanalen blir med i en klynge og sender og mottar meldinger, samt informasjon om nettverkets tilstand.

3.1. Opprette en kanal

Vi lager en JChannel med en bane til en konfigurasjonsfil. Hvis vi utelater filnavnet, vil det se etter udp.xml i gjeldende arbeidskatalog.

Vi oppretter en kanal med en eksplisitt navngitt konfigurasjonsfil:

JChannel kanal = ny JChannel ("src / main / resources / udp.xml"); 

JGroups-konfigurasjon kan være veldig komplisert, men standard UDP- og TCP-konfigurasjoner er tilstrekkelig for de fleste applikasjoner. Vi har tatt med filen for UDP i koden vår og vil bruke den til denne opplæringen.

For mer informasjon om konfigurering av transport se JGroups manualen her.

3.2. Koble til en kanal

Etter at vi har opprettet kanalen vår, må vi bli med i en klynge. En klynge er en gruppe noder som utveksler meldinger.

Å bli med i en klynge krever et klyngenavn:

channel.connect ("Baeldung"); 

Den første noden som prøver å bli med i en klynge, oppretter den hvis den ikke eksisterer. Vi ser denne prosessen i aksjon nedenfor.

3.3. Navngi en kanal

Noder identifiseres med et navn slik at jevnaldrende kan sende dirigerte meldinger og motta varsler om hvem som går inn i og forlater klyngen. JGroups tildeler et navn automatisk, eller vi kan sette vårt eget:

channel.name ("bruker1");

Vi bruker disse navnene nedenfor for å spore når noder kommer inn og forlater klyngen.

3.4. Lukke en kanal

Kanalopprydding er viktig hvis vi ønsker at jevnaldrende mottar varsler om at vi har avsluttet i tide.

Vi lukker a JChannel med sin tette metode:

channel.close ()

4. Endringer i klyngevisning

Med en opprettet JChannel er vi nå klare til å se tilstanden til jevnaldrende i klyngen og utveksle meldinger med dem.

JGroups opprettholder klyngetilstand inne i Utsikt klasse. Hver kanal har en enkelt Utsikt av nettverket. Når visningen endres, leveres den via viewAccepted () Ring tilbake.

For denne opplæringen utvider vi ReceiverAdaptor API-klasse som implementerer alle grensesnittmetodene som kreves for et program.

Det er den anbefalte måten å implementere tilbakeringinger på.

La oss legge til viewAccepted til søknaden vår:

public void viewAccepted (View newView) {private View lastView; hvis (lastView == null) {System.out.println ("Mottatt første visning:"); newView.forEach (System.out :: println); } annet {System.out.println ("Mottatt ny visning."); List newMembers = View.newMembers (lastView, newView); System.out.println ("Nye medlemmer:"); newMembers.forEach (System.out :: println); Liste exMembers = View.leftMembers (lastView, newView); System.out.println ("Avsluttede medlemmer:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Hver Utsikt inneholder en Liste av Adresse objekter, som representerer hvert medlem av klyngen. JGroups tilbyr brukervennlige metoder for å sammenligne en visning til en annen, som vi bruker til å oppdage nye eller forlatte medlemmer av klyngen.

5. Sende meldinger

Meldingshåndtering i JGroups er grei. EN Beskjed inneholder en byte array og Adresse objekter som tilsvarer avsenderen og mottakeren.

For denne opplæringen bruker vi Strenger lese fra kommandolinjen, men det er lett å se hvordan et program kan utveksle andre datatyper.

5.1. Kringkaste beskjeder

EN Beskjed er opprettet med en destinasjon og et byte-utvalg; JChannel setter avsenderen for oss. Hvis målet er null, hele klyngen vil motta meldingen.

Vi godtar tekst fra kommandolinjen og sender den til klyngen:

System.out.print ("Skriv inn en melding:"); Strenglinje = in.readLine (). ToLowerCase (); Meldingsmelding = ny melding (null, line.getBytes ()); channel.send (melding); 

Hvis vi kjører flere forekomster av programmet vårt og sender denne meldingen (etter at vi har implementert motta() metoden nedenfor), vil alle motta den, inkludert avsenderen.

5.2. Blokkerer våre meldinger

Hvis vi ikke vil se meldingene våre, kan vi angi en eiendom for det:

channel.setDiscardOwnMessages (true); 

Når vi kjører den forrige testen, mottar ikke meldingssenderen sin kringkastingsmelding.

5.3. Direktemeldinger

Å sende en direkte melding krever en gyldig Adresse. Hvis vi refererer til noder etter navn, trenger vi en måte å slå opp en Adresse. Heldigvis har vi Utsikt for det.

Den nåværende Utsikt er alltid tilgjengelig fra JChannel:

privat Valgfritt getAddress (strengnavn) {View view = channel.view (); return view.getMembers (). stream () .filter (adresse -> name.equals (address.toString ())) .findAny (); } 

Adresse navn er tilgjengelig via klassen toString () metode, så vi bare søker i Liste av klyngemedlemmer for navnet vi ønsker.

Så vi kan godta et navn fra konsollen, finne den tilknyttede destinasjonen og sende en direkte melding:

Adressedestinasjon = null; System.out.print ("Angi en destinasjon:"); Streng destinasjonsnavn = in.readLine (). ToLowerCase (); destinasjon = getAddress (destinasjonsnavn) .orElseThrow (() -> nytt unntak ("Destinasjon ikke funnet"); Meldingsmelding = ny Melding (destinasjon, "Hei!"); channel.send (melding); 

6. Motta meldinger

Vi kan sende meldinger, nå la oss legge til, prøv å motta dem nå.

La oss overstyre ReceiverAdaptor's tom mottaksmetode:

offentlig ugyldig mottakelse (meldingsmelding) {String line = Melding mottatt fra: "+ message.getSrc () +" til: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (linje);} 

Siden vi vet at meldingen inneholder en String, vi kan trygt passere getObject () til System.out.

7. Statlig børs

Når en node kommer inn i nettverket, kan det hende at den må hente tilstandsinformasjon om klyngen. JGroups gir en statsoverføringsmekanisme for dette.

Når en node blir med i klyngen, ringer den ganske enkelt getState (). Klyngen henter vanligvis staten fra det eldste medlemmet i gruppen - koordinatoren.

La oss legge til en kringkastingsmelding i vårt program. Vi legger til en ny medlemsvariabel og øker den inne motta():

private Integer messageCount = 0; public void receive (Message message) {String line = "Melding mottatt fra:" + message.getSrc () + "til:" + message.getDest () + "->" + message.getObject (); System.out.println (linje); hvis (message.getDest () == null) {messageCount ++; System.out.println ("Antall meldinger:" + messageCount); }} 

Vi ser etter en null destinasjon fordi hvis vi teller direktemeldinger, vil hver node ha et annet nummer.

Deretter overstyrer vi to flere metoder i ReceiverAdaptor:

public void setState (InputStream input) {try {messageCount = Util.objectFromStream (new DataInputStream (input)); } fange (Unntak e) {System.out.println ("Feil deserialing tilstand!"); } System.out.println (messageCount + "er den nåværende meldingen."); } public void getState (OutputStream output) kaster Unntak {Util.objectToStream (messageCount, new DataOutputStream (output)); } 

I likhet med meldinger overfører JGroups tilstand som en matrise av byte.

JGroups leverer en InputStream til koordinatoren å skrive staten til, og en OutputStream for den nye noden å lese. API-en gir bekvemmelighetsklasser for serialisering og deserialisering av dataene.

Merk at tilgang til tilstandsinformasjon i produksjonskode må være trådsikker.

Til slutt legger vi til samtalen til getState () til oppstarten vår, etter at vi kobler oss til klyngen:

channel.connect (klyngenavn); channel.getState (null, 0); 

getState () aksepterer et mål å be om staten fra og en tidsavbrudd i millisekunder. EN null destinasjon indikerer koordinatoren og 0 betyr ikke tidsavbrudd.

Når vi kjører denne appen med et par noder og utveksler kringkastingsmeldinger, ser vi antall meldinger.

Så hvis vi legger til en tredje klient eller stopper og starter en av dem, ser vi den nylig tilkoblede noden skrive ut riktig meldingstall.

8. Konklusjon

I denne opplæringen brukte vi JGroups til å lage et program for utveksling av meldinger. Vi brukte API for å overvåke hvilke noder som var koblet til og forlot klyngen, og også for å overføre klyngetilstand til en ny node når den ble med.

Kodeeksempler, som alltid, kan du finne på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found