Introduksjon til Spring Cloud Stream

1. Oversikt

Spring Cloud Stream er et rammeverk bygget på toppen av Spring Boot og Spring Integration som hjelper til med å lage hendelsesdrevne eller meldingsdrevne mikrotjenester.

I denne artikkelen introduserer vi konsepter og konstruksjoner av Spring Cloud Stream med noen enkle eksempler.

2. Maven-avhengigheter

For å komme i gang, må vi legge til Spring Cloud Starter Stream med megleren RabbitMQ Maven avhengighet som meldings-mellomvare til vår pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

Og vi vil legge til modulavhengigheten fra Maven Central for å aktivere JUnit-støtte også:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Hovedkonsepter

Mikrotjenestearkitektur følger prinsippet om “smarte endepunkter og dumme rør”. Kommunikasjon mellom sluttpunkter drives av meldings-mellomvarepartier som RabbitMQ eller Apache Kafka. Tjenester kommuniserer ved å publisere domenehendelser via disse endepunktene eller kanalene.

La oss gå gjennom konseptene som utgjør Spring Cloud Stream-rammeverket, sammen med de essensielle paradigmene som vi må være klar over for å bygge meldingsdrevne tjenester.

3.1. Konstruerer

La oss se på en enkel tjeneste i Spring Cloud Stream som lytter til inngang bindende og sender et svar til produksjon bindende:

@SpringBootApplication @EnableBinding (Processor.class) offentlig klasse MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) public LogMessage enrichLogMessage (LogMessage log) {return new LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

Kommentaren @EnableBinding konfigurerer applikasjonen for å binde kanalene INNGANG og PRODUKSJON definert i grensesnittet Prosessor. Begge kanalene er bindinger som kan konfigureres til å bruke en konkret messaging-mellomvare eller bindemiddel.

La oss ta en titt på definisjonen av alle disse begrepene:

  • Bindinger - en samling grensesnitt som identifiserer inngangs- og utgangskanalene erklærende
  • Binder - messaging-middleware implementering som Kafka eller RabbitMQ
  • Kanal - representerer kommunikasjonsrøret mellom messaging-middleware og applikasjonen
  • StreamListeners - meldingshåndteringsmetoder i bønner som automatisk blir påkalt på en melding fra kanalen etter MessageConverter gjør serialisering / deserialisering mellom mellomvarespesifikke hendelser og domeneobjekttyper / POJOer
  • Messalvie Skjemaer - brukes til serialisering og deserialisering av meldinger, disse skjemaene kan leses statisk fra et sted eller lastes dynamisk, og støtter utviklingen av domenetypetyper

3.2. Kommunikasjonsmønstre

Meldinger angitt til destinasjoner leveres av Publiser-abonner meldingsmønster. Forlag kategoriserer meldinger i emner, hver identifisert med et navn. Abonnenter uttrykker interesse for ett eller flere emner. Midtvaren filtrerer meldingene og leverer de av de interessante emnene til abonnentene.

Nå kan abonnentene grupperes. EN forbrukergruppe er et sett med abonnenter eller forbrukere, identifisert av a gruppe-ID, innenfor hvilke meldinger fra et emne eller emnets partisjon blir levert på en lastbalansert måte.

4. Programmeringsmodell

Denne delen beskriver det grunnleggende om å bygge Spring Cloud Stream-applikasjoner.

4.1. Funksjonell testing

Teststøtten er en bindemiddelimplementering som gjør det mulig å kommunisere med kanalene og inspisere meldinger.

La oss sende en melding til ovenstående richLogMessage service og sjekk om svaret inneholder teksten “[1]: “ i begynnelsen av meldingen:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests {@Autowired private Processor pipe; @Autowired privat MessageCollector messageCollector; @Test offentlig ugyldig nårSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (new LogMessage ("This is my message")) .build ()); Objekt nyttelast = meldingCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Dette er meldingen min", payload.toString ()); }}

4.2. Egendefinerte kanaler

I eksemplet ovenfor brukte vi Prosessor grensesnitt levert av Spring Cloud, som bare har en inngang og en utgangskanal.

Hvis vi trenger noe annet, som en inngang og to utgangskanaler, kan vi lage en tilpasset prosessor:

offentlig grensesnitt MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @Output MessageChannel anotherOutput (); }

Våren vil gi oss riktig implementering av dette grensesnittet. Kanalnavnene kan stilles inn ved hjelp av merknader som i @Output (“myOutput”).

Ellers vil Spring bruke metodenavnene som kanalnavn. Derfor har vi tre kanaler kalt myInput, minOutput, og en annen utgang.

La oss forestille oss at vi vil rute meldingene til en utgang hvis verdien er mindre enn 10 og til en annen utgang er verdien større enn eller lik 10:

@Autowired privat MyProcessor prosessor; @StreamListener (MyProcessor.INPUT) public void routeValues ​​(Integer val) {if (val <10) {processor.anOutput (). Send (message (val)); } annet {prosessor.anotherOutput (). send (melding (val)); }} privat statisk slutt Meldingsmelding (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Betinget utsendelse

Bruker @StreamListener kommentar, det kan vi også filtrer meldingene vi forventer hos forbrukeren bruker en hvilken som helst tilstand som vi definerer med SpEL-uttrykk.

Som et eksempel kan vi bruke betinget utsendelse som en annen tilnærming for å dirigere meldinger til forskjellige utganger:

@Autowired privat MyProcessor prosessor; @StreamListener (target = MyProcessor.INPUT, condition = "nyttelast = 10") public void routeValuesToAnotherOutput (Integer val) {prosessor.anotherOutput (). Send (melding (val)); }

Den eneste begrensningen av denne tilnærmingen er at disse metodene ikke må gi en verdi.

5. Oppsett

La oss sette opp applikasjonen som skal behandle meldingen fra RabbitMQ-megleren.

5.1. Bindemiddelkonfigurasjon

Vi kan konfigurere applikasjonen vår til å bruke standard bindemiddelimplementering via META-INF / fjærbindere:

kanin: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Eller vi kan legge til bindemiddelbiblioteket for RabbitMQ til klassestien ved å inkludere denne avhengigheten:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

Hvis ingen bindemiddelimplementering er gitt, vil Spring bruke direkte meldingskommunikasjon mellom kanalene.

5.2. RabbitMQ-konfigurasjon

For å konfigurere eksemplet i avsnitt 3.1 for å bruke RabbitMQ-bindemidlet, må vi oppdatere application.yml ligger ved src / main / resources:

vår: sky: stream: bindinger: input: destinasjon: que.log.messages bindemiddel: local_rabbit utgang: destinasjon: queue.pretty.log.messages bindemiddel: local_rabbit bindemidler: local_rabbit: type: kaninmiljø: vår: rabbitmq: vert: port : 5672 brukernavn: passord: virtual-host: /

De inngang binding vil bruke sentralen som kalles queue.log.messages, og produksjon binding vil bruke sentralen queue.pretty.log.messages. Begge bindinger vil bruke binderen som kalles lokal_rabbit.

Merk at vi ikke trenger å opprette RabbitMQ-sentralene eller køene på forhånd. Når du kjører applikasjonen, begge sentrene opprettes automatisk.

For å teste applikasjonen kan vi bruke RabbitMQ-administrasjonssiden til å publisere en melding. I Publiser melding sentralen queue.log.messages, må vi legge inn forespørselen i JSON-format.

5.3. Tilpasse meldingskonvertering

Spring Cloud Stream lar oss bruke meldingskonvertering for bestemte innholdstyper. I eksemplet ovenfor ønsker vi å gi ren tekst i stedet for å bruke JSON-format.

For å gjøre dette, må vi bruke en tilpasset transformasjon til LogMessage bruker en MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) offentlig klasse MyLoggerServiceApplication {// ... @Bean public MessageConverter providesTextPlainMessageConverter () {returner ny TextPlainMessageConverter (); } // ...}
offentlig klasse TextPlainMessageConverter utvider AbstractMessageConverter {public TextPlainMessageConverter () {super (ny MimeType ("tekst", "vanlig")); } @ Override beskyttede boolske støtter (Class clazz) {return (LogMessage.class == clazz); } @ Override-beskyttet Object convertFromInternal (Meldingsmelding, Class targetClass, Object conversionHint) {Object payload = message.getPayload (); Strengtekst = nyttelastforekomst av streng? (Streng) nyttelast: ny streng ((byte []) nyttelast); returner ny LogMessage (tekst); }}

Etter å ha brukt disse endringene, går du tilbake til Publiser melding panelet, hvis vi setter overskriften “contentTypes" til "tekst / vanlig”Og nyttelasten til“Hei Verden“, Det skal fungere som før.

5.4. Forbrukergrupper

Når du kjører flere forekomster av applikasjonen vår, hver gang det er en ny melding i en inngangskanal, vil alle abonnentene bli varslet.

Mesteparten av tiden trenger vi meldingen kun behandles en gang. Spring Cloud Stream implementerer denne oppførselen via forbrukergrupper.

For å aktivere denne oppførselen kan hver forbrukerbinding bruke spring.cloud.stream.bindings..group egenskap for å spesifisere et gruppenavn:

vår: sky: stream: bindinger: input: destinasjon: kø.log.messages bindemiddel: local_rabbit gruppe: logMessageConsumers ...

6. Meldingsdrevne mikrotjenester

I denne delen introduserer vi alle nødvendige funksjoner for å kjøre vår Cloud Cloud Stream-applikasjoner i en mikroservicekontekst.

6.1. Skalerer opp

Når flere applikasjoner kjører, er det viktig å sikre at dataene blir delt riktig på tvers av forbrukerne. For å gjøre dette tilbyr Spring Cloud Stream to egenskaper:

  • spring.cloud.stream.instanceCount - antall applikasjoner som kjører
  • spring.cloud.stream.instanceIndex - indeks over gjeldende applikasjon

For eksempel hvis vi har distribuert to forekomster av ovennevnte MyLoggerServiceApplication søknad, eiendommen spring.cloud.stream.instanceCount skal være 2 for begge applikasjoner, og eiendommen spring.cloud.stream.instanceIndex skal være henholdsvis 0 og 1.

Disse egenskapene settes automatisk hvis vi distribuerer Spring Cloud Stream-applikasjoner ved hjelp av Spring Data Flow som beskrevet i denne artikkelen.

6.2. Oppdeling

Domenehendelsene kan være Partisjonert meldinger. Dette hjelper når vi er skalere opp lagring og forbedre applikasjonsytelsen.

Domenearrangementet har vanligvis en partisjonsnøkkel slik at den havner i samme partisjon med relaterte meldinger.

La oss si at vi vil at loggmeldingene skal partisjoneres med den første bokstaven i meldingen, som vil være partisjonsnøkkelen, og grupperes i to partisjoner.

Det ville være en partisjon for loggmeldingene som begynner med ER og en annen partisjon for N-Z. Dette kan konfigureres ved hjelp av to egenskaper:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - uttrykket for å dele nyttelastene
  • spring.cloud.stream.bindings.output.producer.partitionCount - antall grupper

Noen ganger er uttrykket for partisjon for komplisert til å skrive det i bare en linje. I disse tilfellene kan vi skrive vår tilpassede partisjonsstrategi ved hjelp av eiendommen spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Helseindikator

I en mikroservicesammenheng trenger vi det også oppdage når en tjeneste er nede eller begynner å mislykkes. Spring Cloud Stream tilbyr eiendommen management.health.binders.enabled for å aktivere helseindikatorene for permer.

Når du kjører applikasjonen, kan vi stille helsestatus på //:/Helse.

7. Konklusjon

I denne opplæringen presenterte vi hovedkonseptene til Spring Cloud Stream og viste hvordan du bruker det gjennom noen enkle eksempler over RabbitMQ. Mer informasjon om Spring Cloud Stream finner du her.

Kildekoden for denne artikkelen finner du på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found