Introduksjon til KafkaStreams i Java

1. Oversikt

I denne artikkelen ser vi på KafkaStreams bibliotek.

KafkaStreams er konstruert av skaperne av Apache Kafka. Det primære målet med denne programvaren er å la programmerere lage effektive streaming-applikasjoner i sanntid som kan fungere som Microservices.

KafkaStreams gjør oss i stand til å konsumere fra Kafka-emner, analysere eller transformere data, og potensielt sende dem til et annet Kafka-emne.

Å demonstrere KafkaStreams, vi lager en enkel applikasjon som leser setninger fra et emne, teller forekomster av ord og skriver ut antall per ord.

Viktig å merke seg er at KafkaStreams biblioteket er ikke reaktivt og har ingen støtte for asynkronisering og mottrykkhåndtering.

2. Maven avhengighet

For å begynne å skrive strømbehandlingslogikk med KafkaStreams, vi må legge til en avhengighet av kafka-bekker og kafka-klienter:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-clients 1.0.0 

Vi må også ha Apache Kafka installert og startet fordi vi bruker et Kafka-emne. Dette emnet vil være datakilden for streamingjobben vår.

Vi kan laste ned Kafka og andre nødvendige avhengigheter fra det offisielle nettstedet.

3. Konfigurere KafkaStreams-inngang

Det første vi skal gjøre er definisjonen av Kafka-emnet.

Vi kan bruke Sammenløpende verktøyet vi lastet ned - det inneholder en Kafka Server. Den inneholder også kafka-konsoll-produsent som vi kan bruke til å publisere meldinger til Kafka.

For å komme i gang, la oss kjøre Kafka-klyngen:

./konfluent start

Når Kafka starter, kan vi definere datakilden og navnet på applikasjonen vår ved hjelp av APPLICATION_ID_CONFIG:

String inputTopic = "inputTopic";
Egenskaper streamsConfiguration = nye egenskaper (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

En avgjørende konfigurasjonsparameter er BOOTSTRAP_SERVER_CONFIG. Dette er URL-en til vår lokale Kafka-forekomst som vi nettopp startet:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Deretter må vi passere nøkkeltypen og verdien av meldinger som blir konsumert fra inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Strømbehandling er ofte statelig. Når vi vil lagre mellomresultater, må vi spesifisere STATE_DIR_CONFIG parameter.

I testen bruker vi et lokalt filsystem:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Å bygge en strømmetopologi

Når vi har definert inngangstemaet vårt, kan vi lage en streamingtopologi - det er en definisjon av hvordan hendelser skal håndteres og transformeres.

I vårt eksempel vil vi implementere en ordteller. For hver setning sendt til inputTopic, vi ønsker å dele det opp i ord og beregne forekomsten av hvert ord.

Vi kan bruke en forekomst av KStreamsBuilder klasse for å begynne å konstruere vår topologi:

KStreamBuilder builder = ny KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Mønster mønster = Mønster.kompil ("\ W +", Mønster.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(verdi -> Arrays.asList (mønster.split (value.toLowerCase ()))) .groupBy ((nøkkel, ord) -> word) .count ();

For å implementere ordtelling må vi for det første dele verdiene ved å bruke regulært uttrykk.

Delingsmetoden returnerer en matrise. Vi bruker flatMapValues ​​() å flate den. Ellers vil vi ende opp med en liste over matriser, og det ville være upraktisk å skrive kode ved hjelp av en slik struktur.

Til slutt samler vi verdiene for hvert ord og kaller telle() som vil beregne forekomster av et bestemt ord.

5. Håndteringsresultater

Vi har allerede beregnet ordtellingen på inngangsmeldingene våre. La oss nå skrive ut resultatene på standardutgangen ved hjelp av for hver() metode:

wordCounts .foreach ((w, c) -> System.out.println ("word:" + w + "->" + c));

Ved produksjon kan en slik streamingjobb ofte publisere utdataene til et annet Kafka-emne.

Vi kunne gjøre dette ved hjelp av til () metode:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

De Serde klasse gir oss forhåndskonfigurerte serialiseringer for Java-typer som skal brukes til å serieisere objekter til en rekke byte. Arten med byte vil deretter bli sendt til Kafka-emnet.

Vi bruker String som en nøkkel til temaet vårt og Lang som en verdi for den faktiske tellingen. De til() metoden vil lagre de resulterende dataene til outputTopic.

6. Starte KafkaStream Job

Fram til dette bygget vi en topologi som kan utføres. Jobben har imidlertid ikke startet ennå.

Vi må starte jobben eksplisitt ved å ringe start() metoden på KafkaStreams forekomst:

KafkaStreams streams = nye KafkaStreams (builder, streamsConfiguration); streams.start (); Tråd. Søvn (30000); streams.close ();

Merk at vi venter 30 sekunder på at jobben skal være ferdig. I et virkelig scenario vil den jobben løpe hele tiden og behandle hendelser fra Kafka når de ankommer.

Vi kan teste jobben vår ved å publisere noen arrangementer til Kafka-emnet.

La oss starte en kafka-konsoll-produsent og manuelt sende noen hendelser til vårt inputTopic:

./kafka-console-producer --topic inputTopic - meglerliste localhost: 9092> "dette er en ponni"> "dette er en hest og ponni" 

På denne måten publiserte vi to hendelser til Kafka. Søknaden vår vil konsumere disse hendelsene og vil skrive ut følgende utdata:

ord: -> 1 ord: dette -> 1 ord: er -> 1 ord: a -> 1 ord: ponni -> 1 ord: -> 2 ord: dette -> 2 ord: er -> 2 ord: a - > 2 ord: hest -> 1 ord: og -> 1 ord: ponni -> 2

Vi kan se at da den første meldingen kom, ble ordet ponni skjedde bare en gang. Men da vi sendte den andre meldingen, ordet ponni skjedde for andre gang utskrift: “ord: ponni -> 2 ″.

6. Konklusjon

Denne artikkelen diskuterer hvordan du oppretter en primær strømbehandlingsapplikasjon ved hjelp av Apache Kafka som datakilde og KafkaStreams bibliotek som strømbehandlingsbibliotek.

Alle disse eksemplene og kodebitene finner du i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.