Bygg en datarørledning med Kafka, Spark Streaming og Cassandra

1. Oversikt

Apache Kafka er en skalerbar plattform med høy ytelse og lav latens tillater lesing og skriving av datastrømmer som et meldingssystem. Vi kan starte med Kafka i Java ganske enkelt.

Spark Streaming er en del av Apache Spark-plattformen som muliggjør skalerbar, høy gjennomstrømning, feiltolerant behandling av datastrømmer. Selv om det er skrevet i Scala, tilbyr Spark Java APIer å jobbe med.

Apache Cassandra er en distribuert og bred kolonn NoSQL datalager. Mer informasjon om Cassandra er tilgjengelig i vår forrige artikkel.

I denne opplæringen kombinerer vi disse for å lage en svært skalerbar og feiltolerant datarørledning for en sanntids datastrøm.

2. Installasjoner

For å starte, trenger vi Kafka, Spark og Cassandra installert lokalt på maskinen vår for å kjøre applikasjonen. Vi får se hvordan vi utvikler en datarørledning ved hjelp av disse plattformene mens vi går videre.

Imidlertid forlater vi alle standardkonfigurasjoner inkludert porter for alle installasjoner som vil hjelpe deg med å få opplæringen til å kjøre problemfritt.

2.1. Kafka

Installering av Kafka på vår lokale maskin er ganske grei og kan bli funnet som en del av den offisielle dokumentasjonen. Vi bruker 2.1.0-utgivelsen av Kafka.

I tillegg, Kafka krever at Apache Zookeeper kjører men for formålet med denne opplæringen, vil vi utnytte den eneste noden Zookeeper-forekomst pakket med Kafka.

Når vi har klart å starte Zookeeper og Kafka lokalt etter den offisielle guiden, kan vi fortsette å lage emnet vårt, kalt "meldinger":

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ --replikasjonsfaktor 1 - partisjoner 1 \ - emnebeskjeder

Merk at skriptet ovenfor er for Windows-plattformen, men det er lignende skript tilgjengelig for Unix-lignende plattformer også.

2.2. Gnist

Spark bruker Hadoops klientbiblioteker for HDFS og Garn. Følgelig det kan være veldig vanskelig å montere de kompatible versjonene av alle disse. Imidlertid kommer den offisielle nedlastingen av Spark ferdigpakket med populære versjoner av Hadoop. For denne opplæringen bruker vi versjon 2.3.0-pakken “forhåndsbygd for Apache Hadoop 2.7 og nyere”.

Når den riktige pakken med Spark er pakket ut, kan de tilgjengelige skriptene brukes til å sende inn søknader. Vi får se dette senere når vi utvikler applikasjonen vår i Spring Boot.

2.3. Cassandra

DataStax gjør tilgjengelig en community-utgave av Cassandra for forskjellige plattformer, inkludert Windows. Vi kan laste ned og installere dette på vår lokale maskin veldig enkelt etter den offisielle dokumentasjonen. Vi bruker versjon 3.9.0.

Når vi har klart å installere og starte Cassandra på vår lokale maskin, kan vi fortsette å lage nøkkelområdet og bordet vårt. Dette kan gjøres ved hjelp av CQL Shell som følger med vår installasjon:

CREATE KEYSPACE vocabulary WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; BRUK ordforråd; OPPRETT TABELLord (ordtekst PRIMÆR NØKKEL, telle int);

Merk at vi har opprettet et navneområde som heter ordforråd og et bord der heter ord med to kolonner, ord, og telle.

3. Avhengigheter

Vi kan integrere Kafka- og Spark-avhengigheter i applikasjonen vår gjennom Maven. Vi trekker disse avhengighetene fra Maven Central:

  • Core Spark
  • SQL Spark
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Og vi kan legge dem til vår pom tilsvarende:

 org.apache.spark spark-core_2.11 2.3.0 gitt org.apache.spark spark-sql_2.11 2.3.0 gitt org.apache.spark gnist-streaming_2.11 2.3.0 gitt org.apache.spark gnist-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark gnist-cassandra-connector_2.11 2.3.0 com.datastax.spark gnist-cassandra-connector-java_2.11 1.5.2 

Merk at noen avhengighetene er merket som sørget for i sikte. Dette er fordi disse vil bli gjort tilgjengelig av Spark-installasjonen der vi sender inn søknaden om utførelse ved hjelp av gnist-send.

4. Spark Streaming - Kafka Integration Strategies

På dette tidspunktet er det verdt å snakke kort om integrasjonsstrategiene for Spark og Kafka.

Kafka introduserte nytt forbruker-API mellom versjoner 0,8 og 0,10. Derfor er de tilsvarende Spark Streaming-pakkene tilgjengelige for begge meglerversjonene. Det er viktig å velge riktig pakke, avhengig av megler tilgjengelig og funksjoner som ønskes.

4.1. Spark Streaming Kafka 0.8

0.8-versjonen er stabil integrasjons-API med muligheter for å bruke mottakerbasert eller direkte tilnærming. Vi går ikke inn på detaljene i disse tilnærmingene som vi finner i den offisielle dokumentasjonen. Et viktig poeng å merke seg her er at denne pakken er kompatibel med Kafka Broker-versjoner 0.8.2.1 eller nyere.

4.2. Spark Streaming Kafka 0.10

Dette er for øyeblikket i en eksperimentell tilstand og er bare kompatibel med Kafka Broker-versjoner 0.10.0 eller høyere. Denne pakken tilbyr bare Direct Approach, og bruker nå den nye Kafka forbruker-APIen. Vi finner mer detaljer om dette i den offisielle dokumentasjonen. Viktigere er det ikke bakoverkompatibel med eldre Kafka Broker-versjoner.

Vær oppmerksom på at for denne opplæringen, bruker vi 0.10-pakken. Avhengigheten nevnt i forrige avsnitt refererer bare til dette.

5. Utvikling av en datarørledning

Vi lager en enkel applikasjon i Java ved hjelp av Spark som vil integreres med Kafka-emnet vi opprettet tidligere. Søknaden vil lese meldingene som lagt ut og telle ordfrekvensen i hver melding. Dette vil deretter bli oppdatert i Cassandra-tabellen vi opprettet tidligere.

La oss raskt visualisere hvordan dataene vil flyte:

5.1. Får JavaStreamingContext

For det første begynner vi med å initialisere JavaStreamingContext som er inngangspunktet for alle Spark Streaming-applikasjoner:

SparkConf sparkConf = ny SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = ny JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Får DStream fra Kafka

Nå kan vi koble til Kafka-emnet fra JavaStreamingContext:

Kart kafkaParams = nytt HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "siste"); kafkaParams.put ("enable.auto.commit", false); Samlingstemaer = Arrays.asList ("meldinger"); JavaInputDStream meldinger = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Abonner (emner, kafkaParams));

Vær oppmerksom på at vi må tilby deserializers for nøkkel og verdi her. For vanlige datatyper som String, deserializer er tilgjengelig som standard. Imidlertid, hvis vi ønsker å hente tilpassede datatyper, må vi tilby tilpassede deserializers.

Her har vi fått JavaInputDStream som er en implementering av Discretized Streams eller DStreams, den grunnleggende abstraksjonen fra Spark Streaming. Internt er DStreams ingenting annet enn en kontinuerlig serie med RDD.

5.3. Behandling oppnådd DStream

Vi skal nå utføre en rekke operasjoner på JavaInputDStream for å få ordfrekvenser i meldingene:

JavaPairDStream-resultater = meldinger .mapToPair (post -> ny Tuple2 (record.key (), record.value ())); JavaDStream-linjer = resultater. Kart (tuple2 -> tuple2._2 ()); JavaDStream-ord = linjer .flatMap (x -> Arrays.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = ord .mapToPair (s -> ny Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Vedvarende behandlet DStream inn i Cassandra

Endelig kan vi gjenta det behandlede JavaPairDStream å sette dem inn i Cassandra-bordet vårt:

wordCounts.foreachRDD (javaRdd -> {Map wordCountMap = javaRdd.collectAsMap (); for (String key: wordCountMap.keySet ()) {List wordList = Arrays.asList (new Word (key, wordCountMap.get (key))); JavaRDD rdd = streamingContext.sparkContext (). Parallellisere (wordList); javaFunctions (rdd) .writerBuilder ("vokabular", "ord", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. Kjører applikasjonen

Siden dette er en strømbehandlingsapplikasjon, vil vi holde dette i gang:

streamingContext.start (); streamingContext.awaitTermination ();

6. Utnytte kontrollpunkter

I en strømbehandlingsapplikasjon, Det er ofte nyttig å beholde tilstanden mellom databaser som behandles.

For eksempel, i vårt forrige forsøk, er vi bare i stand til å lagre gjeldende frekvens av ordene. Hva om vi vil lagre den kumulative frekvensen i stedet? Spark Streaming gjør det mulig gjennom et konsept som kalles sjekkpunkter.

Vi endrer nå rørledningen vi opprettet tidligere for å utnytte sjekkpunkter:

Vær oppmerksom på at vi bare bruker sjekkpunkter for økten med databehandling. Dette gir ikke feiltoleranse. Imidlertid kan kontrollpunkting også brukes for feiltoleranse.

Det er noen endringer vi må gjøre i søknaden vår for å utnytte sjekkpunkter. Dette inkluderer å tilby JavaStreamingContext med en sjekkpunktplassering:

streamingContext.checkpoint ("./. sjekkpunkt");

Her bruker vi det lokale filsystemet til å lagre sjekkpunkter. For robusthet bør dette imidlertid lagres på et sted som HDFS, S3 eller Kafka. Mer om dette er tilgjengelig i den offisielle dokumentasjonen.

Deretter må vi hente sjekkpunktet og opprette et kumulativt antall ord mens vi behandler hver partisjon ved hjelp av en kartleggingsfunksjon:

JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Tuple2 output = new Tuple2 (ord, sum); state.update (sum); returutgang;}));

Når vi får de kumulative ordtellingene, kan vi fortsette med å gjenta og lagre dem i Cassandra som før.

Vær oppmerksom på at mens datakontroll er nyttig for stateful behandling, det kommer med en latens kostnad. Derfor er det nødvendig å bruke dette med omhu sammen med et optimalt kontrollpunktintervall.

7. Forstå forskyvninger

Hvis vi husker noen av Kafka-parametrene vi satte tidligere:

kafkaParams.put ("auto.offset.reset", "siste"); kafkaParams.put ("enable.auto.commit", false);

Disse betyr i utgangspunktet det Vi ønsker ikke å automatisk forplikte for forskyvningen og ønsker å velge den siste forskyvningen hver gang en forbrukergruppe initialiseres. Derfor vil søknaden vår bare kunne konsumere meldinger som er lagt ut i løpet av den perioden den kjører.

Hvis vi ønsker å konsumere alle meldinger som er lagt ut, uavhengig av om applikasjonen kjører eller ikke, og også vil holde rede på meldingene som allerede er lagt ut, vi må konfigurere forskyvningen riktig sammen med å lagre forskyvningstilstanden, selv om dette er litt utenfor omfanget for denne opplæringen.

Dette er også en måte som Spark Streaming tilbyr et bestemt nivå av garanti som "nøyaktig en gang". Dette betyr i utgangspunktet at hver melding som legges ut på Kafka-emnet, bare vil bli behandlet nøyaktig en gang av Spark Streaming.

8. Implementering av applikasjon

Vi kan distribuere applikasjonen vår ved hjelp av Spark-send-skriptet som leveres ferdigpakket med Spark-installasjonen:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .krukke

Vær oppmerksom på at krukken vi lager med Maven skal inneholde avhengighetene som ikke er merket som sørget for i sikte.

Når vi har sendt inn denne søknaden og lagt ut noen meldinger i Kafka-emnet vi opprettet tidligere, bør vi se de kumulative ordtellingene blir lagt ut i Cassandra-tabellen vi opprettet tidligere.

9. Konklusjon

For å oppsummere lærte vi i denne opplæringen hvordan du lager en enkel datarørledning ved hjelp av Kafka, Spark Streaming og Cassandra. Vi lærte også å utnytte sjekkpunkter i Spark Streaming for å opprettholde tilstanden mellom batchene.

Som alltid er koden for eksemplene tilgjengelig på GitHub.