Introduksjon til Apache Kafka med Spring

Utholdenhetstopp

Jeg kunngjorde nettopp det nye Lær våren kurs, med fokus på det grunnleggende i vår 5 og vårstøvel 2:

>> KONTROLLER KURSET

1. Oversikt

Apache Kafka er et distribuert og feiltolerant strømbehandlingssystem.

I denne artikkelen vil vi dekke vårstøtte for Kafka og nivået på abstraksjoner det gir over native Kafka Java-klient-APIer.

Spring Kafka gir den enkle og typiske vårmalprogrammeringsmodellen en KafkaTemplate og meldingsdrevne POJOer via @KafkaListener kommentar.

2. Installasjon og oppsett

For å laste ned og installere Kafka, se den offisielle guiden her.

Vi må også legge til vår-kafka avhengighet til vår pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

Den siste versjonen av denne gjenstanden finner du her.

Eksempel på applikasjon vil være en Spring Boot-applikasjon.

Denne artikkelen forutsetter at serveren startes med standardkonfigurasjonen, og at ingen serverporter endres.

3. Konfigurere emner

Tidligere pleide vi å kjøre kommandolinjeverktøy for å lage emner i Kafka som:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikasjonsfaktor 1 - partisjoner 1 \ --topic mytopic

Men med innføringen av AdminClient i Kafka kan vi nå lage emner programmatisk.

Vi må legge til KafkaAdmin Spring Bean, som automatisk vil legge til emner for alle bønner av typen Nytt emne:

@Configuration public class KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") private String bootstrapAddress; @Bean offentlig KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); returner nye KafkaAdmin (configs); } @Bean public NewTopic topic1 () {return new NewTopic ("baeldung", 1, (short) 1); }}

4. Produsere meldinger

For å opprette meldinger må vi først konfigurere en ProdusentFabrikk som setter strategien for å skape Kafka Produsent tilfeller.

Da trenger vi en KafkaTemplate som pakker inn en Produsent forekomst og gir praktiske metoder for sending av meldinger til Kafka-emner.

Produsent forekomster er trådsikre, og bruk av en enkelt forekomst gjennom en applikasjonskontekst vil gi høyere ytelse. Følgelig KakfaTemplate forekomster er også trådsikre, og bruk av en forekomst anbefales.

4.1. Produsentkonfigurasjon

@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); returner nye DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (producerFactory ()); }}

4.2. Publiseringsmeldinger

Vi kan sende meldinger ved hjelp av KafkaTemplate klasse:

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }

De sende API returnerer en ListenableFuture gjenstand. Hvis vi vil blokkere sendetråden og få resultatet om den sendte meldingen, kan vi ringe API for ListenableFuture gjenstand. Tråden vil vente på resultatet, men det vil redusere produsenten.

Kafka er en rask strømbehandlingsplattform. Så det er en bedre idé å håndtere resultatene asynkront, slik at de påfølgende meldingene ikke venter på resultatet av forrige melding. Vi kan gjøre dette gjennom en tilbakeringing:

public void sendMessage (Strengmelding) {ListenableFuture fremtid = kafkaTemplate.send (topicName, melding); future.addCallback (ny ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Sent message = [" + message + "] with offset = [" + result.getRecordMetadata (). Offset () + "]") ; } @ Override public void onFailure (Throwable ex) {System.out.println ("Kan ikke sende melding = [" + melding + "] på grunn av:" + ex.getMessage ()); }}); }

5. Forbruker meldinger

5.1. Forbrukerkonfigurasjon

For å konsumere meldinger må vi konfigurere en Forbrukerfabrikk og en KafkaListenerContainerFactory. Når disse bønnene er tilgjengelige i vårbønnefabrikken, kan POJO-baserte forbrukere konfigureres ved hjelp av @KafkaListener kommentar.

@EnableKafka merknad kreves i konfigurasjonsklassen for å muliggjøre gjenkjenning av @KafkaListener kommentar på vårstyrte bønner:

@EnableKafka @Configuration offentlig klasse KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map rekvisitter = ny HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); returner nye DefaultKafkaConsumerFactory (rekvisitter); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory fabrikk = ny ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); tilbake fabrikken; }}

5.2. Forbruker meldinger

@KafkaListener (topics = "topicName", groupId = "foo") offentlig ugyldig listenGroupFoo (strengmelding) {System.out.println ("Mottatt melding i gruppefo:" + melding); }

Flere lyttere kan implementeres for et emne, hver med en annen gruppe-ID. Videre kan en forbruker lytte etter meldinger fra forskjellige emner:

@KafkaListener (topics = "topic1, topic2", groupId = "foo")

Spring støtter også henting av en eller flere meldingshoder ved hjelp av @Overskrift kommentar i lytteren:

@KafkaListener (topics = "topicName") offentlig ugyldig listenWithHeaders (@Payload strengmelding, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int-partisjon) {System.out.println ("Mottatt melding:" + melding "+" fra partisjon: "+ partisjon);}

5.3. Forbruker meldinger fra en bestemt partisjon

Som du kanskje har lagt merke til, hadde vi laget emnet baeldung med bare en partisjon. Imidlertid, for et emne med flere partisjoner, a @KafkaListener kan eksplisitt abonnere på en bestemt partisjon av et emne med en innledende forskyvning:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") offentlig ugyldig listenToPartition (@Payload strengmelding, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int-partisjon) {System.out.println ("Mottatt melding:" + melding "+" fra partisjon: "+ partisjon) ;}

Siden initialOffset har blitt sendt til 0 i denne lytteren, blir alle tidligere forbrukte meldinger fra partisjoner 0 og tre forbrukes hver gang denne lytteren initialiseres. Hvis det ikke er nødvendig å stille inn forskyvningen, kan vi bruke skillevegger tilhører @TopicPartition kommentar for å angi bare partisjonene uten forskyvning:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partisjoner = {"0", "1"}))

5.4. Legger til meldingsfilter for lyttere

Lyttere kan konfigureres til å konsumere bestemte typer meldinger ved å legge til et tilpasset filter. Dette kan gjøres ved å stille inn a RecordFilterStrategy til KafkaListenerContainerFactory:

@Bean offentlig ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory fabrikk = ny ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (post -> record.value (). inneholder ("World")); tilbake fabrikken; }

En lytter kan deretter konfigureres til å bruke denne containerfabrikken:

@KafkaListener (topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") offentlig ugyldig listenWithFilter (strengmelding) {System.out.println ("Mottatt melding i filtrert lytter:" + melding); }

I denne lytteren, alle meldinger som samsvarer med filteret blir kastet.

6. Tilpassede meldingsomformere

Så langt har vi bare dekket sending og mottak av strenger som meldinger. Vi kan imidlertid også sende og motta tilpassede Java-objekter. Dette krever at du konfigurerer riktig serializer i ProdusentFabrikk og deserializer i Forbrukerfabrikk.

La oss se på en enkel bønneklasse, som vi vil sende som meldinger:

offentlig klasse Hilsen {private String msg; privat strengnavn; // standard getters, setters og constructor}

6.1. Produserer egendefinerte meldinger

I dette eksemplet vil vi bruke JsonSerializer. La oss se på koden for ProdusentFabrikk og KafkaTemplate:

@Bean offentlig ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); returner nye DefaultKafkaProducerFactory (configProps); } @Bean offentlig KafkaTemplate hilsenKafkaTemplate () {returner ny KafkaTemplate (greetingProducerFactory ()); }

Denne nye KafkaTemplate kan brukes til å sende Hilsen beskjed:

kafkaTemplate.send (topicName, new Greeting ("Hello", "World"));

6.2. Forbruker egendefinerte meldinger

På samme måte, la oss endre Forbrukerfabrikk og KafkaListenerContainerFactory å deserialisere hilsningsmeldingen riktig:

@Bean public ConsumerFactory greetingConsumerFactory () {// ... return new DefaultKafkaConsumerFactory (rekvisitter, new StringDeserializer (), new JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (hilsenConsumerFactory ()); tilbake fabrikken; }

Vår-kafka JSON serializer og deserializer bruker Jackson-biblioteket, som også er en valgfri mavenavhengighet for vår-kafka-prosjektet. Så la oss legge det til vårt pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

I stedet for å bruke den nyeste versjonen av Jackson, anbefales det å bruke versjonen som er lagt til pom.xml av vår-kafka.

Til slutt må vi skrive en lytter til å konsumere Hilsen meldinger:

@KafkaListener (topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") offentlig ugyldig greetingListener (hilsenhilsen) {// prosess hilsen}

7. Konklusjon

I denne artikkelen tok vi for oss det grunnleggende om vårstøtte for Apache Kafka. Vi tok en kort titt på klassene som brukes til å sende og motta meldinger.

Komplett kildekode for denne artikkelen finner du på GitHub. Før du utfører koden, må du sørge for at Kafka-serveren kjører og at emnene blir opprettet manuelt.

Persistensbunn

Jeg kunngjorde nettopp det nye Lær våren kurs, med fokus på det grunnleggende i vår 5 og vårstøvel 2:

>> KONTROLLER KURSET

$config[zx-auto] not found$config[zx-overlay] not found