Nøyaktig en gang behandler i Kafka med Java

1. Oversikt

I denne opplæringen vil vi se på hvordan Kafka sørger for nøyaktig én gang levering mellom produsent- og forbrukerapplikasjoner gjennom det nylig introduserte Transactional API.

I tillegg bruker vi denne API-en til å implementere transaksjonsprodusenter og forbrukere for å oppnå end-to-end-levering nøyaktig én gang i et WordCount-eksempel.

2. Melding levering i Kafka

På grunn av ulike feil kan meldingssystemer ikke garantere levering av meldinger mellom produsent- og forbrukerapplikasjoner. Avhengig av hvordan klientapplikasjonene samhandler med slike systemer, er følgende meldingssemantikk mulig:

  • Hvis et meldingssystem aldri vil duplisere en melding, men kanskje savner en og annen melding, kaller vi det høyst en gang
  • Eller hvis den aldri vil savne en melding, men kanskje dupliserer en og annen melding, kaller vi den i hvert fall en gang
  • Men hvis det alltid leverer alle meldinger uten duplisering, er det nøyaktig-en gang

Opprinnelig støttet Kafka bare levering av meldinger på høyst én gang og minst én gang.

Derimot, introduksjonen av transaksjoner mellom Kafka-meglere og klientapplikasjoner garanterer levering en gang i Kafka. For å forstå det bedre, la oss raskt gå gjennom transaksjonsklient-API.

3. Maven-avhengigheter

For å jobbe med transaksjons-API-et trenger vi Kafkas Java-klient i vår pom:

 org.apache.kafka kafka-klienter 2.0.0 

4. En transaksjon forbruke-transformere-produsere Løkke

For eksempel vil vi konsumere meldinger fra et inndatatema, setninger.

For hver setning teller vi hvert ord og sender de enkelte ordtellingene til et utgangstema, teller.

I eksemplet antar vi at det allerede er transaksjonsdata tilgjengelig i setninger emne.

4.1. En transaksjonsbevisst produsent

Så la oss først legge til en typisk Kafka-produsent.

Eiendommer producerProps = nye egenskaper (); producerProps.put ("bootstrap.servers", "localhost: 9092");

I tillegg må vi imidlertid spesifisere en transactional.id og aktivere egenmakt:

producerProps.put ("enable.idempotence", "true"); producerProps.put ("transactional.id", "prod-1"); KafkaProducer produsent = ny KafkaProducer (producerProps);

Fordi vi har aktivert idempotens, vil Kafka bruke denne transaksjons-IDen som en del av algoritmen til deduplisere enhver melding denne produsentensender, sikre idempotens.

Enkelt sagt, hvis produsenten ved et uhell sender den samme meldingen til Kafka mer enn en gang, gjør disse innstillingene det mulig å legge merke til det.

Alt vi trenger å gjøre er sørg for at transaksjons-ID er tydelig for hver produsent, men konsistent på tvers av omstart.

4.2. Aktivere produsenten for transaksjoner

Når vi er klare, må vi også ringe initTransaction å forberede produsenten til å bruke transaksjoner:

producer.initTransactions ();

Dette registrerer produsenten hos megleren som en som kan bruke transaksjoner, identifisere det ved sin transactional.id og et sekvensnummer, eller epoke. I sin tur vil megleren bruke disse til å skrive frem eventuelle handlinger i en transaksjonslogg.

Og konsekvent, megleren vil fjerne alle handlinger fra loggen som tilhører en produsent med samme transaksjons-ID og tidligereepoke, antar at de er fra nedlagte transaksjoner.

4.3. En transaksjonsbevisst forbruker

Når vi konsumerer, kan vi lese alle meldingene på en emnepartisjon i rekkefølge. Selv om, kan vi indikere med isolasjon. nivå at vi skal vente med å lese transaksjonsmeldinger til den tilknyttede transaksjonen er begått:

Egenskaper consumerProps = nye egenskaper (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "false"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer forbruker = ny KafkaConsumer (consumerProps); forbruker.abonnement (singleton (“setninger”));

Bruke en verdi på read_committed sørger for at vi ikke leser noen transaksjonsmeldinger før transaksjonen er fullført.

Standardverdien på isolasjon. nivå er leseforpliktet.

4.4. Forbruker og transformerer ved transaksjon

Nå som vi har produsenten og forbrukeren begge konfigurert til å skrive og lese transaksjonsmessig, kan vi konsumere poster fra vårt input-emne og telle hvert ord i hver post:

ConsumerRecords-poster = consumer.poll (ofSeconds (60)); Map wordCountMap = records.records (new TopicPartition ("input", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (ord, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Merk at det ikke er noe transaksjonsmessig om ovennevnte kode. Men, siden vi brukte read_committed, det betyr at ingen meldinger som ble skrevet til inngangsemnet i den samme transaksjonen vil bli lest av denne forbrukeren før de alle er skrevet.

Nå kan vi sende det beregnede ordtallet til utdataemnet.

La oss se hvordan vi kan produsere resultatene våre, også transaksjonelt.

4.5. Send API

For å sende tellingen vår som nye meldinger, men i samme transaksjon, ringer vi begin Transaksjon:

producer.beginTransaction ();

Deretter kan vi skrive hver til vårt "teller" emne med nøkkelen som ordet og telleren er verdien:

wordCountMap.forEach ((nøkkel, verdi) -> produsent.send (ny ProducerRecord ("teller", nøkkel, verdi.tilString ())));

Merk at fordi produsenten kan dele dataene med nøkkelen, betyr dette at transaksjonsmeldinger kan strekke seg over flere partisjoner, som hver leses av separate forbrukere. Derfor vil Kafka-megler lagre en liste over alle oppdaterte partisjoner for en transaksjon.

Legg også merke til at innenfor en transaksjon kan en produsent bruke flere tråder for å sende poster parallelt.

4.6. Forskyvning

Og til slutt må vi begå våre forskyvninger som vi nettopp har brukt. Med transaksjoner forplikter vi motregningene tilbake til inndataemnet vi leser dem fra, som normalt. Også skjønt, vi sende dem til produsentens transaksjon.

Vi kan gjøre alt dette i en samtale, men vi må først beregne forskyvningene for hver emnepartisjon:

Map offsetsToCommit = ny HashMap (); for (TopicPartition partition: recordss.partitions ()) {List partitionedRecords = recordss.records (partisjon); long offset = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (partisjon, ny OffsetAndMetadata (offset + 1)); }

Vær oppmerksom på at det vi forplikter oss til transaksjonen er den kommende motregningen, noe som betyr at vi må legge til 1.

Deretter kan vi sende våre beregnede forskyvninger til transaksjonen:

producer.sendOffsetsToTransaction (offsetsToCommit, "min gruppe-id");

4.7. Gjør eller avbryter transaksjonen

Og til slutt kan vi begå transaksjonen, som atomisk vil skrive forskyvningene til forbrukeroffset emne så vel som selve transaksjonen:

producer.commitTransaction ();

Dette skyller alle bufrede meldinger til de respektive partisjonene. I tillegg gjør Kafka-megleren alle meldinger i den transaksjonen tilgjengelig for forbrukerne.

Selvfølgelig, hvis noe går galt mens vi behandler, for eksempel hvis vi får et unntak, kan vi ringe abortTransaksjon:

prøv {// ... les fra inndatatema // ... transformer // ... skriv til utdatatema producer.commitTransaction (); } fange (Unntak e) {producer.abortTransaction (); }

Og slipp eventuelle bufrede meldinger og fjern transaksjonen fra megleren.

Hvis vi verken begår eller avbryter før meglerkonfigurasjonen maks. transaksjon.timeout.ms, Kafka-megleren vil avbryte selve transaksjonen. Standardverdien for denne egenskapen er 900 000 millisekunder eller 15 minutter.

5. Annet forbruke-transformere-produsere Sløyfer

Det vi nettopp har sett er grunnleggende forbruke-transformere-produsere loop som leser og skriver til samme Kafka-klynge.

Omvendt, applikasjoner som må lese og skrive til forskjellige Kafka-klynger, må bruke de eldre commitSync og commitAsync API. Vanligvis lagrer applikasjoner forbrukerforskyvninger i deres eksterne tilstandslagring for å opprettholde transaksjonaliteten.

6. Konklusjon

For datakritiske applikasjoner er ending-til-end-prosessering en gangs behandling ofte viktig.

I denne veiledningen, vi så hvordan vi bruker Kafka til å gjøre akkurat dette ved hjelp av transaksjoner, og vi implementerte et transaksjonsbasert ordtellingeksempel for å illustrere prinsippet.

Sjekk gjerne ut alle kodeeksemplene på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found