Testing av Kafka og Spring Boot

1. Oversikt

Apache Kafka er et kraftig, distribuert, feiltolerant strømbehandlingssystem. I en tidligere veiledning lærte vi hvordan vi kan jobbe med Spring og Kafka.

I denne veiledningen, Vi bygger videre på den forrige og lærer hvordan vi kan skrive pålitelige, selvstendige integrasjonstester som ikke er avhengige av at en ekstern Kafka-server kjører.

Først begynner vi, men ser på hvordan du bruker og konfigurerer en innebygd forekomst av Kafka. Så får vi se hvordan vi kan bruke det populære rammeverket Testcontainers fra testene våre.

2. Avhengigheter

Selvfølgelig må vi legge til standarden vår-kafka avhengighet til vår pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Da trenger vi to andre avhengigheter spesielt for testene våre. Først legger vi til vår-kafka-test gjenstand:

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

Og til slutt vil vi legge til Testcontainers Kafka-avhengighet, som også er tilgjengelig på Maven Central:

 org.testcontainers kafka 1.15.0 test 

Nå som vi har konfigurert alle nødvendige avhengigheter, kan vi skrive en enkel Spring Boot-applikasjon ved hjelp av Kafka.

3. En enkel Kafka-produsent-forbrukerapplikasjon

Gjennom denne opplæringen vil fokuset på testene våre være en enkel produsent-forbruker Spring Boot Kafka-applikasjon.

La oss starte med å definere inngangspunktet for applikasjonen:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Som vi kan se, er dette en standard Spring Boot-applikasjon. Der det er mulig, vil vi benytte standardkonfigurasjonsverdier. Med dette i bakhodet bruker vi @EnableAutoConfiguration kommentar for å automatisk konfigurere applikasjonen vår.

3.1. Produsentoppsett

La oss deretter vurdere en produsentbønne som vi vil bruke til å sende meldinger til et gitt Kafka-emne:

@Komponent offentlig klasse KafkaProducer {privat statisk slutt Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; offentlig ugyldig sending (strengemne, strengnyttelast) {LOGGER.info ("sending nyttelast =" {} "til topic =" {} "", nyttelast, emne); kafkaTemplate.send (emne, nyttelast); }}

Våre KafkaProdusent bønne definert ovenfor er bare et omslag rundt KafkaTemplate klasse. Denne klassen gir trådsikre operasjoner på høyt nivå, for eksempel å sende data til det oppgitte emnet, som er nøyaktig hva vi gjør i vårt sende metode.

3.2. Forbrukeroppsett

På samme måte definerer vi nå en enkel forbrukerbønne som vil lytte til et Kafka-emne og motta meldinger:

@Komponent offentlig klasse KafkaConsumer {privat statisk slutt Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); privat CountDownLatch-lås = ny CountDownLatch (1); privat streng nyttelast = null; @KafkaListener (topics = "$ {test.topic}") offentlig ugyldig mottakelse (ConsumerRecord consumerRecord) {LOGGER.info ("mottatt nyttelast =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); latch.countDown (); } offentlig CountDownLatch getLatch () {returlås; } offentlig streng getPayload () {retur nyttelast; }}

Vår enkle forbruker bruker @KafkaListener kommentar på motta metode for å lytte til meldinger om et gitt emne. Vi får se senere hvordan vi konfigurerer test.topic fra testene våre.

Videre lagrer mottaksmetoden meldingsinnholdet i bønnen vår og reduserer antall klinke variabel. Denne variabelen er et enkelt trådsikkert tellefelt som vi vil bruke senere fra testene våre for å sikre at vi har mottatt en melding.

Nå som vi har implementert vår enkle Kafka-applikasjon ved hjelp av Spring Boot, la oss se hvordan vi kan skrive integrasjonstester.

4. Et ord om testing

Generelt, når vi skriver rene integrasjonstester, bør vi ikke være avhengige av eksterne tjenester som vi kanskje ikke kan kontrollere eller kanskje plutselig slutter å virke. Dette kan ha negative effekter på testresultatene.

På samme måte, hvis vi er avhengige av en ekstern tjeneste, i dette tilfellet en løpende Kafka-megler, vil vi sannsynligvis ikke kunne sette den opp, kontrollere den og rive den ned slik vi ønsker fra testene våre.

4.1. Søknadsegenskaper

Vi kommer til å bruke et veldig lett sett med applikasjonskonfigurasjonsegenskaper fra testene våre. Vi definerer disse egenskapene i vår src / test / resources / application.yml fil:

vår: kafka: forbruker: auto-offset-reset: tidligste gruppe-id: baeldung test: topic: embedded-test-topic

Dette er det minste settet med egenskaper vi trenger når vi jobber med en innebygd forekomst av Kafka eller en lokal megler.

De fleste av disse er selvforklarende, men den vi bør fremheve av særlig betydning er forbrukereiendommen auto-offset-reset: tidligst. Denne egenskapen sørger for at forbrukergruppen vår får meldingene vi sender fordi containeren kan starte etter at sendingene er fullført.

I tillegg konfigurerer vi en emneegenskap med verdien innebygd-test-emne, som er emnet vi vil bruke fra testene våre.

5. Testing ved hjelp av innebygd Kafka

I denne delen tar vi en titt på hvordan du bruker en Kafka-forekomst i minnet for å kjøre testene våre mot. Dette er også kjent som Embedded Kafka.

Avhengigheten vår-kafka-test vi la til tidligere inneholder noen nyttige verktøy for å hjelpe deg med å teste applikasjonen vår. Spesielt inneholder den EmbeddedKafkaBroker klasse.

Med det i tankene, la oss gå videre og skrive vår første integrasjonstest:

@SpringBootTest @DirtiesContext @EmbeddedKafka (partisjoner = 1, brokerProperties = {"lyttere = PLAINTEXT: // localhost: 9092", "port = 9092"}) klasse EmbeddedKafkaIntegrationTest {@Autowired privat KafkaConsumer forbruker; @Autowired privat KafkaProducer produsent; @Value ("$ {test.topic}") privat strengemne; @Test offentlig ugyldighet gittEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () kaster Unntak {producer.send (emne, "Sender med egen enkel KafkaProducer"); consumer.getLatch (). venter (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containString ("embedded-test-topic")); }}

La oss gå gjennom de viktigste delene av testen vår. Først begynner vi med å dekorere testklassen vår med to ganske standard vårkommentarer:

  • De @SpringBootTest merknader vil sikre at vår test bootstraps vår applikasjonssammenheng
  • Vi bruker også @DirtiesContext kommentar, som vil sørge for at denne konteksten blir renset og tilbakestilt mellom forskjellige tester

Her kommer den avgjørende delen, vi bruker @EmbeddedKafka kommentar for å injisere en forekomst av en EmbeddedKafkaBroker inn i testene våre. Videre er det flere egenskaper vi kan bruke til å konfigurere den innebygde Kafka-noden:

  • skillevegger - dette er antall partisjoner som brukes per emne. For å holde ting pent og enkelt, vil vi bare at en skal brukes fra testene våre
  • meglerEiendommer - tilleggseiendommer for Kafka-megleren. Igjen holder vi ting enkelt og spesifiserer en lytter i ren tekst og et portnummer

Deretter kobler vi automatisk forbruker og produsent klasser og konfigurere et emne for å bruke verdien fra vår application.properties.

For den siste delen av stikksag, vi bare sender en melding til testemnet vårt og verifiserer at meldingen er mottatt og inneholder navnet på testemnet vårt.

Når vi kjører testen vår, vil vi se blant den detaljerte våren:

... 12: 45: 35.099 [main] INFO cbkafka.embedded.KafkaProducer - sender nyttelast = "Sender med vår egen enkle KafkaProducer" til topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - received payload = 'ConsumerRecord (topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialisert nøkkelstørrelse = -1, serialisert verdi størrelse = 41, headers = RecordHeaders (headers = [], isReadOnly = false),  nøkkel = null, verdi = Sender med vår egen enkle KafkaProducer) ' 

Dette bekrefter at testen vår fungerer som den skal. Rått! Vi har nå en måte å skrive selvstendige, uavhengige integrasjonstester ved hjelp av en Kafka-megler i minnet.

6. Testing av Kafka med testcontainere

Noen ganger kan vi se små forskjeller mellom en ekte ekstern tjeneste og en innebygd forekomst i minnet av en tjeneste som er spesielt levert for testformål. Selv om det ikke er sannsynlig, kan det også være at porten som ble brukt fra testen vår, var okkupert og forårsaket en feil.

Med dette i bakhodet, i denne delen, vil vi se en variant av vår tidligere tilnærming til testing ved hjelp av Testcontainers-rammeverket. Vi får se hvordan vi kan starte og administrere en ekstern Apache Kafka-megler som er vert i en Docker-container fra integrasjonstesten vår.

La oss definere en annen integrasjonstest som vil være ganske lik den vi så i forrige avsnitt:

@RunWith (SpringRunner.class) @Import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (klasser = KafkaProducerConsumerApplication.class) @DirtiesConst .parse ("confluentinc / cp-kafka: 5.4.3")); @Autowired privat KafkaConsumer forbruker; @Autowired privat KafkaProducer produsent; @Value ("$ {test.topic}") privat strengemne; @Test offentlig ugyldig gittKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () kaster Unntak {producer.send (emne, "Sender med egen kontroller"); consumer.getLatch (). venter (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containString ("embedded-test-topic")); }}

La oss ta en titt på forskjellene denne gangen. Vi erklærer kafka felt, som er en standard JUnit @ClassRule. Dette feltet er en forekomst av KafkaContainer klasse som skal forberede og administrere livssyklusen til vår container som kjører Kafka.

For å unngå havnekollisjon tildeler Testcontainers et portnummer dynamisk når dockercontaineren vår starter. Av denne grunn tilbyr vi en tilpasset fabrikkonfigurasjon for forbruker og produsent ved bruk av klassen KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs () {Map rekvisitter = ny HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "tidligste"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // flere standardkonfigurasjonsrekvisita; } @Bean offentlig ProducerFactory producerFactory () {Map configProps = ny HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // mer standardkonfigurasjon returnerer nye DefaultKafkaProducerFactory (configProps); }

Vi refererer deretter til denne konfigurasjonen via @Import kommentar i begynnelsen av testen.

Årsaken til dette er at vi trenger en måte å injisere serveradressen i applikasjonen vår, som som tidligere nevnt, genereres dynamisk. Vi oppnår dette ved å kalle getBootstrapServers () metode, som vil returnere bootstrap-serverplasseringen:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Nå når vi kjører testen vår, bør vi se at Testcontainers gjør flere ting:

  • Sjekker vårt lokale Docker-oppsett.
  • Trekker confluentinc / cp-kafka: 5.4.3 dockerbilde om nødvendig
  • Starter en ny container og venter på at den skal være klar
  • Slutt til slutt og sletter beholderen etter at testen er ferdig

Igjen bekreftes dette ved å inspisere testutgangen:

13: 33: 10.396 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Opprette container for bilde: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [main] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - Start beholder med ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13: 33: 10,785 [hoved] INFO 🐳 [confluentinc / cp-Kafka: 5.4.3] - container confluentinc / cp-Kafka: 5.4.3 starter: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! En fungerende integrasjonstest ved bruk av en Kafka docker container.

7. Konklusjon

I denne artikkelen har vi lært om et par tilnærminger for å teste Kafka-applikasjoner med Spring Boot. I den første tilnærmingen så vi hvordan du konfigurerer og bruker en lokal Kafka-megler i minnet.

Så så vi hvordan vi bruker Testcontainers til å sette opp en ekstern Kafka-megler som kjører inne i en dockercontainer fra testene våre.

Som alltid er hele kildekoden til artikkelen tilgjengelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found