Apache RocketMQ med Spring Boot

1. Introduksjon

I denne veiledningen oppretter vi en meldingsprodusent og forbruker ved hjelp av Spring Boot og Apache RocketMQ, en åpen kildekode distribuert meldings- og streaming-dataplattform.

2. Avhengigheter

For Maven-prosjekter må vi legge til avhengighet av RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Å produsere meldinger

For eksempel vil vi lage en grunnleggende meldingsprodusent som sender hendelser når brukeren legger til eller fjerner en vare fra handlekurven.

La oss først sette opp serverplasseringen og gruppenavnet i vårt application.properties:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = cart-producer-group

Merk at hvis vi hadde mer enn én navneserver, kunne vi liste dem slik vert: port; vert: port.

Nå, for å gjøre det enkelt, oppretter vi en CommandLineRunner applikasjon og generere noen hendelser under oppstart av applikasjonen:

@SpringBootApplication offentlig klasse CartEventProducer implementerer CommandLineRunner {@Autowired private RocketMQTemplate raketMQTemplate; public static void main (String [] args) {SpringApplication.run (CartEventProducer.class, args); } offentlig tomkjøring (String ... args) kaster Unntak {rocketMQTemplate.convertAndSend ("cart-item-add-topic", ny CartItemEvent ("bike", 1)); rocketMQTemplate.convertAndSend ("cart-item-add-topic", ny CartItemEvent ("datamaskin", 2)); rocketMQTemplate.convertAndSend ("cart-item-fjernet-emne", nytt CartItemEvent ("bike", 1)); }}

De CartItemEvent består av bare to egenskaper - varens id og en mengde:

class CartItemEvent {private String itemId; privat int mengde; // constructor, getters and setters}

I eksemplet ovenfor bruker vi convertAndSend () metode, en generisk metode definert av AbstraktMeldingSendingTemplate abstrakt klasse, for å sende vogneventene våre. Det tar to parametere: En destinasjon, som i vårt tilfelle er et emnenavn, og en nyttelast for meldinger.

4. Melding Forbruker

Å forbruke RocketMQ-meldinger er så enkelt som å lage en vårkomponent som er merket med @RocketMQMessageListener og implementere RocketMQListener grensesnitt:

@SpringBootApplication offentlig klasse CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") offentlig klasse CardItemAddConsumer implementerer RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Legger til element: {}", addItemEvent); // tilleggslogikk}} @Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") offentlig klasse CardItemRemoveConsumer implementerer RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Fjerner element: {}", removeItemEvent); // tilleggslogikk}}}

Vi må lage en egen komponent for alle meldingsemner vi lytter etter. I hver av disse lytterne definerer vi navnet på emnet og forbrukergruppenavnet gjennom @RocketMQMessageListener kommentar.

5. Synkron og asynkron overføring

I de foregående eksemplene brukte vi convertAndSend metode for å sende våre meldinger. Vi har imidlertid noen andre alternativer.

Vi kan for eksempel ringe syncSend som er forskjellig fra convertAndSend fordi den kommer tilbake SendResult gjenstand.

Den kan for eksempel brukes til å verifisere om meldingen vår ble sendt, eller få ID-en:

offentlig tomkjøring (String ... args) kaster unntak {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", ny CartItemEvent ("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", ny CartItemEvent ("datamaskin", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("cart-item-fjernet-emne", ny CartItemEvent ("bike", 1)); }

Som convertAndSend, denne metoden returneres bare når sendingprosedyren er fullført.

Vi bør bruke synkron overføring i tilfeller som krever høy pålitelighet, for eksempel viktige varslingsmeldinger eller SMS-varsler.

På den annen side kan det være lurt å sende meldingen asynkront og bli varslet når sendingen er fullført.

Vi kan gjøre dette med asyncSend, som tar en Send tilbakeringing som parameter og returnerer umiddelbart:

rocketMQTemplate.asyncSend ("cart-item-add-topic", ny CartItemEvent ("bike", 1), ny SendCallback () {@Override public void onSuccess (SendResult sendResult) {log.error ("Vellykket sendt handlevogn") ;} @Override public void onException (Throwable throwable) {log.error ("Unntak under sending av handlevogn", throwable);}});

Vi bruker asynkron overføring i tilfeller som krever høy gjennomstrømning.

Til slutt, for scenarier der vi har svært høye gjennomstrømningskrav, kan vi bruke sendOneWay i stedet for asyncSend. sendOneWay er forskjellig fra asyncSend ved at det ikke garanterer at meldingen blir sendt.

Enveis overføring kan også brukes til vanlige pålitelighetssaker, som for eksempel innsamling av logger.

6. Sende meldinger i transaksjon

RocketMQ gir oss muligheten til å sende meldinger i en transaksjon. Vi kan gjøre det ved å bruke sendInTransaction () metode:

MessageBuilder.withPayload (new CartItemEvent ("bike", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("test-transaksjon", "emne-navn", msg, null);

Vi må også implementere en RocketMQLocalTransactionListener grensesnitt:

@RocketMQTransactionListener (txProducerGroup = "test-transaction") klasse TransactionListenerImpl implementerer RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {// ... lokal transaksjonsprosess, returner ROLLBACKNOWT. } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {// ... sjekk transaksjonsstatus og returner ROLLBACK, COMMIT eller Ukjent retur RocketMQLocalTransactionState.COMMIT; }}

I sendMessageInTransaction (), er den første parameteren transaksjonsnavnet. Det må være det samme som @RocketMQTransactionListener’S medlemsfelt txProducerGroup.

7. Konfigurasjon av meldingsprodusent

Vi kan også konfigurere aspekter av selve meldingsprodusenten:

  • rocketmq.producer.send-message-timeout: Meldingen send timeout i millisekunder - standardverdien er 3000
  • rocketmq.producer.compress-message-body-threshold: Terskel over hvilken RocketMQ komprimerer meldinger - standardverdien er 1024.
  • rocketmq.producer.max-melding-størrelse: Maksimal meldingsstørrelse i byte - standardverdien er 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: Maksimalt antall forsøk på å utføre internt i asynkron modus før feil sendes - standardverdien er 2.
  • rocketmq.producer.retry-next-server: Indikerer om du skal prøve en annen megler på nytt når feilen sendes internt - standardverdien er falsk.
  • rocketmq.producer.retry-times-when-send-failed: Maksimalt antall forsøk på å utføre internt i asynkron modus før feil sendes - standardverdien er 2.

8. Konklusjon

I denne artikkelen har vi lært hvordan du sender og bruker meldinger ved hjelp av Apache RocketMQ og Spring Boot. Som alltid er all kildekode tilgjengelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found