Introduksjon til Kafka-kontakter

1. Oversikt

Apache Kafka® er en distribuert plattform for streaming. I en tidligere veiledning diskuterte vi hvordan du implementerer Kafka-forbrukere og produsenter ved hjelp av Spring.

I denne opplæringen lærer vi hvordan du bruker Kafka-kontakter.

Vi tar en titt på:

  • Ulike typer Kafka-kontakter
  • Funksjoner og moduser for Kafka Connect
  • Koblingskonfigurasjon ved hjelp av eiendomsfiler samt REST API

2. Grunnleggende om Kafka Connect og Kafka Connectors

Kafka Connect er et rammeverk for å koble Kafka med eksterne systemer som databaser, nøkkelverdilagre, søkeindekser og filsystemer, ved bruk av såkalte Kontakter.

Kafka-kontakter er ferdige komponenter som kan hjelpe oss å importere data fra eksterne systemer til Kafka-emner og eksportere data fra Kafka-emner til eksterne systemer. Vi kan bruke eksisterende kontaktimplementeringer for vanlige datakilder og vasker eller implementere våre egne kontakter.

EN kildekontakt samler inn data fra et system. Kildesystemer kan være hele databaser, strømtabeller eller meldingsmeglere. En kildekontakt kan også samle beregninger fra applikasjonsservere til Kafka-emner, noe som gjør dataene tilgjengelige for strømbehandling med lav ventetid.

EN vaskekontakt leverer data fra Kafka-emner til andre systemer, som kan være indekser som Elasticsearch, batch-systemer som Hadoop eller hvilken som helst type database.

Noen kontakter vedlikeholdes av samfunnet, mens andre støttes av Confluent eller dets partnere. Virkelig kan vi finne kontakter for de mest populære systemene, som S3, JDBC og Cassandra, for bare å nevne noen.

3. Funksjoner

Kafka Connect-funksjonene inkluderer:

  • Et rammeverk for å koble eksterne systemer til Kafka - det forenkler utvikling, distribusjon og administrasjon av kontakter
  • Distribuert og frittstående modus - det hjelper oss å distribuere store klynger ved å utnytte den distribuerte naturen til Kafka, samt oppsett for utvikling, testing og små produksjonsdistribusjoner
  • REST-grensesnitt - vi kan administrere kontakter ved hjelp av et REST API
  • Automatisk forskyvning - Kafka Connect hjelper oss med å håndtere kompensasjonsforpliktelsesprosess, som sparer oss bryet med å implementere denne feilutsatte delen av kontaktutvikling manuelt
  • Distribuert og skalerbar som standard - Kafka Connect bruker den eksisterende gruppeadministrasjonsprotokollen; vi kan legge til flere arbeidere for å skalere opp en Kafka Connect-klynge
  • Streaming og batch-integrasjon - Kafka Connect er en ideell løsning for å bygge bro over streaming- og batch-datasystemer i forbindelse med Kafkas eksisterende evner
  • Transformasjoner - disse gjør det mulig for oss å gjøre enkle og lette modifikasjoner av individuelle meldinger

4. Oppsett

I stedet for å bruke den vanlige Kafka-distribusjonen, laster vi ned Confluent Platform, en Kafka-distribusjon levert av Confluent, Inc., selskapet bak Kafka. Confluent Platform leveres med noen ekstra verktøy og klienter, sammenlignet med vanlig Kafka, samt noen ekstra forhåndsbygde kontakter.

For vårt tilfelle er Open Source-utgaven tilstrekkelig, som du finner på Confluents nettsted.

5. Hurtigstart Kafka Connect

For det første vil vi diskutere prinsippet til Kafka Connect, bruker de mest grunnleggende kontaktene, som er filen kilde kontakten og filen synke kontakt.

Praktisk, Confluent Platform leveres med begge disse kontaktene, samt referansekonfigurasjoner.

5.1. Source Connector Configuration

For kildekontakten er referansekonfigurasjonen tilgjengelig på $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

navn = lokal filkildekontakt.class = FileStreamSource oppgaver.max = 1 emne = tilkoblingstestfil = test.txt

Denne konfigurasjonen har noen egenskaper som er vanlige for alle kildekontakter:

  • Navn er et brukerdefinert navn for koblingsforekomsten
  • kontakt. klasse spesifiserer implementeringsklassen, i utgangspunktet typen kontakt
  • oppgaver. maks angir hvor mange forekomster av kildekontakten som skal kjøres parallelt, og
  • emne definerer emnet som kontakten skal sende utgangen til

I dette tilfellet har vi også et koblingsspesifikt attributt:

  • fil definerer filen som kontakten skal lese inngangen fra

La oss lage en grunnleggende fil med noe innhold for at dette skal fungere:

ekko -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Merk at arbeidskatalogen er $ CONFLUENT_HOME.

5.2. Sink Connector Configuration

For vaskkontakten vår bruker vi referansekonfigurasjonen på $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

navn = lokal filsink-kontakt.class = FileStreamSink-oppgaver.max = 1 fil = test.sink.txt-emner = tilkoblingstest

Logisk sett inneholder den nøyaktig de samme parametrene, skjønt denne gangen kontakt. klasse spesifiserer implementeringen av vaskekontakten, og fil er stedet der kontakten skal skrive innholdet.

5.3. Arbeiderkonfig

Til slutt må vi konfigurere Connect-arbeideren, som vil integrere de to kontaktene våre og gjøre arbeidet med å lese fra kildekontakten og skrive til vaskekontakten.

For det kan vi bruke $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java

Noter det plugin.path kan holde en liste over stier der koblingsimplementeringer er tilgjengelige

Når vi bruker kontakter som følger med Kafka, kan vi stille inn plugin.path til $ CONFLUENT_HOME / del / java. Arbeider du med Windows, kan det være nødvendig å gi en absolutt vei her.

For de andre parametrene kan vi legge igjen standardverdiene:

  • bootstrap.servers inneholder adressene til Kafka-meglerne
  • key.converter og value.converter definere omformerklasser, som serialiserer og deserialiserer dataene når de strømmer fra kilden til Kafka og deretter fra Kafka til vasken
  • key.converter.schemas.enable og value.converter.schemas.enable er omformerspesifikke innstillinger
  • offset.storage.file.filename er den viktigste innstillingen når du kjører Connect i frittstående modus: den definerer hvor Connect skal lagre offsetdataene
  • offset.flush.interval.ms definerer intervallet som arbeidstakeren prøver å utføre forskyvninger for oppgaver

Og listen over parametere er ganske moden, så sjekk den offisielle dokumentasjonen for en komplett liste.

5.4. Kafka Connect i frittstående modus

Og med det kan vi starte vårt første koblingsoppsett:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. eiendommer

For det første kan vi inspisere innholdet i emnet ved hjelp av kommandolinjen:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topic connect-test - fra begynnelse

Som vi kan se, tok kildekoblingen dataene fra test.txt fil, forvandlet den til JSON og sendte den til Kafka:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "nyttelast": "bar"}

Og hvis vi tar en titt på mappen $ CONFLUENT_HOME, kan vi se at en fil test.sink.txt ble opprettet her:

cat $ CONFLUENT_HOME / test.sink.txt foo bar

Når vaskekoblingen trekker ut verdien fra nyttelast attributt og skriver det til destinasjonsfilen, dataene i test.sink.txt har innholdet i originalen test.txt fil.

La oss nå legge til flere linjer i test.txt.

Når vi gjør det, ser vi at kildekoblingen oppdager disse endringene automatisk.

Vi trenger bare å sørge for å sette inn en ny linje på slutten, ellers vil kildekontakten ikke vurdere den siste linjen.

På dette punktet, la oss stoppe Connect-prosessen, ettersom vi starter Connect i distribuert modus i noen få linjer.

6. Connect's REST API

Inntil nå gjorde vi alle konfigurasjoner ved å sende eiendomsfiler via kommandolinjen. Ettersom Connect er designet for å kjøre som en tjeneste, er det imidlertid også et REST API tilgjengelig.

Som standard er den tilgjengelig på // lokal vert: 8083. Noen endepunkter er:

  • FÅ / kontakter - returnerer en liste med alle kontaktene i bruk
  • GET / kontakter / {name} - returnerer detaljer om en bestemt kontakt
  • POST / kontakter - oppretter en ny kontakt; forespørselen skal være et JSON-objekt som inneholder et strengnavnfelt og et objektkonfigurasjonsfelt med koblingskonfigurasjonsparametere
  • GET / kontakter / {name} / status - returnerer den nåværende statusen for kontakten - inkludert hvis den kjører, mislyktes eller pauses - hvilken arbeider den er tildelt, feilinformasjon hvis den mislyktes, og tilstanden til alle oppgavene
  • SLETT / kontakter / {name} - sletter en kontakt, stopper elegant alle oppgaver og sletter konfigurasjonen
  • GET / connector-plugins - returnerer en liste over plugin-plugins som er installert i Kafka Connect-klyngen

Den offisielle dokumentasjonen gir en liste med alle sluttpunkter.

Vi bruker REST API for å opprette nye kontakter i følgende avsnitt.

7. Kafka Connect i distribuert modus

Den frittstående modusen fungerer perfekt for utvikling og testing, samt mindre oppsett. Imidlertid, hvis vi ønsker å gjøre full bruk av den distribuerte naturen til Kafka, må vi starte Connect i distribuert modus.

Ved å gjøre dette lagres koblingsinnstillinger og metadata i Kafka-emner i stedet for filsystemet. Som et resultat er arbeidernes noder virkelig statsløse.

7.1. Starter Connect

En referansekonfigurasjon for distribuert modus finner du på $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parametrene er stort sett de samme som for frittstående modus. Det er bare noen få forskjeller:

  • group.id definerer navnet på klyngegruppen Connect. Verdien må være forskjellig fra enhver forbrukergruppe-ID
  • offset.storage.topic, config.storage.topic og status.storage.topic definere emner for disse innstillingene. For hvert emne kan vi også definere en replikasjonsfaktor

Igjen, den offisielle dokumentasjonen gir en liste med alle parametere.

Vi kan starte Connect i distribuert modus som følger:

$ CONFLUENT_HOME / bin / koble distribuert $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Legge til kontakter ved hjelp av REST API

Nå, sammenlignet med den frittstående oppstartskommandoen, passerte vi ingen koblingskonfigurasjoner som argumenter. I stedet må vi opprette kontaktene ved hjelp av REST API.

For å sette opp eksemplet vårt fra før, må vi sende to POST-forespørsler til // localhost: 8083 / kontakter inneholder følgende JSON-strukturer.

Først må vi lage kroppen for kildekoblingen POST som en JSON-fil. Her vil vi kalle det connect-file-source.json:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-distribution.txt", "topic ":" tilkoblingsdistribuert "}}

Legg merke til hvordan dette ser ganske ut som referansekonfigurasjonsfilen vi brukte første gang.

Og så POSTER vi det:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Deretter gjør vi det samme for vaskkontakten, og kaller filen connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "tasks.max": 1, "file": "test-distribution.sink.txt", "topics": "connect-distribution"}}

Og utfør POST som før:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Om nødvendig kan vi bekrefte at dette oppsettet fungerer som det skal:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topic connect-distribution --from-beginning {"schema": {"type": "string", "optional": false}, "nyttelast": "foo"} {"skjema": {"type": "streng", "valgfritt": falsk}, "nyttelast": "bar"}

Og hvis vi tar en titt på mappen $ CONFLUENT_HOME, kan vi se at en fil testdistribuert.sink.txt ble opprettet her:

cat $ CONFLUENT_HOME / test-distribution.sink.txt foo bar

Etter at vi har testet distribuert oppsett, la oss rydde opp ved å fjerne de to kontaktene:

krøll -X SLETT // localhost: 8083 / kontakter / lokal fil-kilde krøll -X SLETT // localhost: 8083 / kontakter / lokal fil-vask

8. Transformere data

8.1. Støttede transformasjoner

Transformasjoner gjør at vi kan gjøre enkle og lette modifikasjoner av individuelle meldinger.

Kafka Connect støtter følgende innebygde transformasjoner:

  • Sett inn felt - Legg til et felt ved hjelp av enten statiske data eller postmetadata
  • ReplaceField - Filtrer eller gi nytt navn til felt
  • MaskField - Erstatt et felt med den gyldige nullverdien for typen (null eller en tom streng, for eksempel)
  • HoistField - Pakk hele hendelsen som et enkelt felt inne i en struktur eller et kart
  • ExtractField - Trekk ut et bestemt felt fra struktur og kart, og ta bare med dette feltet i resultatene
  • SetSchemaMetadata - Endre skjemanavnet eller versjonen
  • TidsstempelRouter - Endre emnet til en plate basert på originaltema og tidsstempel
  • RegexRouter - Endre emnet for en plate basert på originaltema, en erstatningsstreng og et vanlig uttrykk

En transformasjon konfigureres ved hjelp av følgende parametere:

  • forvandler - En kommaseparert liste over aliaser for transformasjonene
  • transformerer. $ alias.type - Klassenavn for transformasjonen
  • transformerer. $ alias. $ transformationSpecificConfig - Konfigurasjon for den respektive transformasjonen

8.2. Bruke en transformator

For å teste noen transformasjonsfunksjoner, la oss sette opp følgende to transformasjoner:

  • La oss først pakke hele meldingen som en JSON-struktur
  • Etter det, la oss legge til et felt i den strukturen

Før vi bruker transformasjonene våre, må vi konfigurere Connect for å bruke skjemafri JSON, ved å endre connect-distributed.properties:

key.converter.schemas.enable = falsk verdi.converter.schemas.enable = false

Etter det må vi starte Connect på nytt, igjen i distribuert modus:

$ CONFLUENT_HOME / bin / koble distribuert $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Igjen må vi opprette kroppen for kildekoblingen POST som en JSON-fil. Her vil vi kalle det connect-file-source-transform.json.

I tillegg til de allerede kjente parametrene, legger vi til noen få linjer for de to nødvendige transformasjonene:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transformation.txt", "topic ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}

Etter det, la oss utføre POST:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

La oss skrive noen linjer til vår test-transformation.txt:

Foo Bar

Hvis vi nå inspiserer koble-transformasjon emne, bør vi få følgende linjer:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. Bruke klare kontakter

Etter å ha brukt disse enkle kontaktene, la oss ta en titt på mer avanserte ferdige kontakter, og hvordan du installerer dem.

9.1. Hvor finner du kontakter

Forhåndsbygde kontakter er tilgjengelige fra forskjellige kilder:

  • Noen få kontakter er pakket med vanlig Apache Kafka (kilde og vask for filer og konsoll)
  • Noen flere kontakter følger med Confluent Platform (ElasticSearch, HDFS, JDBC og AWS S3)
  • Ta også en titt på Confluent Hub, som er en slags appbutikk for Kafka-kontakter. Antall tilbudte kontakter vokser kontinuerlig:
    • Confluent-kontakter (utviklet, testet, dokumentert og støttes fullt ut av Confluent)
    • Sertifiserte kontakter (implementert av en tredjepart og sertifisert av Confluent)
    • Samfunnsutviklede og -støttede kontakter
  • Utover det gir Confluent også en koblingsside med noen kontakter som også er tilgjengelige på Confluent Hub, men også med noen flere felleskontakter.
  • Og til slutt er det også leverandører som leverer kontakter som en del av produktet. For eksempel tilbyr Landoop et streaming-bibliotek kalt Lenses, som også inneholder et sett med ~ 25 open source-kontakter (mange av dem krysslistes også andre steder)

9.2. Installere kontakter fra Confluent Hub

Foretaksversjonen av Confluent gir et skript for installering av kontakter og andre komponenter fra Confluent Hub (skriptet er ikke inkludert i Open Source-versjonen). Hvis vi bruker bedriftsversjonen, kan vi installere en kontakt ved hjelp av følgende kommando:

$ CONFLUENT_HOME / bin / confluent-hub install confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. Installere kontakter manuelt

Hvis vi trenger en kontakt, som ikke er tilgjengelig på Confluent Hub, eller hvis vi har Open Source-versjonen av Confluent, kan vi installere de nødvendige kontaktene manuelt. For det må vi laste ned og pakke ut kontakten, samt flytte de medfølgende libs til mappen spesifisert som plugin.path.

For hver kontakt skal arkivet inneholde to mapper som er interessante for oss:

  • De lib mappen inneholder koblingsburken, for eksempel kafka-connect-mqtt-1.0.0-preview.jar, samt noen flere krukker som kreves av kontakten
  • De etc mappen inneholder en eller flere referansekonfigurasjonsfiler

Vi må flytte lib mappen til $ CONFLUENT_HOME / del / java, eller hvilken vei vi spesifiserte som plugin.path i connect-standalone.properties og connect-distributed.properties. Ved å gjøre dette kan det også være fornuftig å gi nytt navn til mappen til noe meningsfylt.

Vi kan bruke konfigurasjonsfilene fra etc enten ved å referere til dem mens du starter i frittstående modus, eller så kan vi bare ta tak i egenskapene og lage en JSON-fil fra dem.

10. Konklusjon

I denne opplæringen så vi på hvordan du installerer og bruker Kafka Connect.

Vi så på typer kontakter, både kilde og vask. Vi så også på noen funksjoner og moduser som Connect kan kjøre i. Deretter gjennomgikk vi transformatorer. Og til slutt lærte vi hvor vi skulle få tak i og hvordan du installerer tilpassede kontakter.

Som alltid kan konfigurasjonsfilene finnes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found