Bygg en datarørledning med Flink og Kafka

1. Oversikt

Apache Flink er et rammeverk for behandling av strøm som enkelt kan brukes med Java. Apache Kafka er et distribuert strømbehandlingssystem som støtter høy feiltoleranse.

I denne opplæringen skal vi se på hvordan du bygger en datarørledning ved hjelp av de to teknologiene.

2. Installasjon

For å installere og konfigurere Apache Kafka, se den offisielle guiden. Etter installasjon kan vi bruke følgende kommandoer til å lage de nye emnene som kalles flink_inngang og flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 - partitions 1 \ --topic flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikasjonsfaktor 1 - partisjoner 1 \ --tema flink_inngang

Av hensyn til denne opplæringen bruker vi standardkonfigurasjon og standardporter for Apache Kafka.

3. Bruk av flink

Apache Flink tillater sanntids strømbehandlingsteknologi. Rammeverket tillater bruk av flere tredjepartssystemer som strømkilder eller vasker.

I Flink - det er forskjellige kontakter tilgjengelig:

  • Apache Kafka (kilde / vask)
  • Apache Cassandra (vask)
  • Amazon Kinesis Streams (kilde / vask)
  • Elasticsearch (vask)
  • Hadoop FileSystem (vask)
  • RabbitMQ (kilde / vask)
  • Apache NiFi (kilde / vask)
  • API for Twitter-streaming (kilde)

For å legge Flink til i prosjektet vårt, må vi inkludere følgende Maven-avhengigheter:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Hvis vi legger til disse avhengighetene, kan vi konsumere og produsere til og fra Kafka-emner. Du finner den nåværende versjonen av Flink på Maven Central.

4. Kafka strengforbruker

For å konsumere data fra Kafka med Flink må vi oppgi et emne og en Kafka-adresse. Vi bør også oppgi en gruppe-ID som skal brukes til å holde forskyvninger, slik at vi ikke alltid vil lese hele dataene fra begynnelsen.

La oss lage en statisk metode som vil gjøre opprettelsen av FlinkKafkaConsumer lettere:

offentlig statisk FlinkKafkaConsumer011 createStringConsumerForTopic (Strengemne, String kafkaAddress, String kafkaGroup) {Properties rekvisitter = nye egenskaper (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 forbruker = ny FlinkKafkaConsumer011 (emne, ny SimpleStringSchema (), rekvisitter); retur forbruker; }

Denne metoden tar en emne, kafkaAdresse, og kafkaGruppe og skaper FlinkKafkaConsumer som vil konsumere data fra gitt emne som en String siden vi har brukt SimpleStringSchema for å dekode data.

Antallet 011 i navnet på klassen refererer til Kafka-versjonen.

5. Kafka String Producer

For å produsere data til Kafka, må vi oppgi Kafka-adresse og emne som vi vil bruke. Igjen kan vi lage en statisk metode som vil hjelpe oss å lage produsenter for forskjellige emner:

offentlig statisk FlinkKafkaProducer011 createStringProducer (strengemne, streng kafkaAddress) {returner ny FlinkKafkaProducer011 (kafkaAddress, emne, ny SimpleStringSchema ()); }

Denne metoden tar bare emne og kafkaAdresse som argumenter siden det ikke er behov for å oppgi gruppe-ID når vi produserer til Kafka-emnet.

6. String Stream Processing

Når vi har en fullt fungerende forbruker og produsent, kan vi prøve å behandle data fra Kafka og deretter lagre resultatene våre tilbake til Kafka. Den komplette listen over funksjoner som kan brukes til strømbehandling finner du her.

I dette eksemplet skal vi kapitalisere ord i hver Kafka-oppføring og deretter skrive den tilbake til Kafka.

For dette formålet må vi lage en tilpasset MapFunction:

offentlig klasse WordsCapitalizer implementerer MapFunction {@Override public String map (String s) {return s.toUpperCase (); }}

Etter å ha opprettet funksjonen, kan vi bruke den i strømbehandling:

offentlig statisk tomrom kapitalisere () {String inputTopic = "flink_input"; Streng outputTopic = "flink_output"; String consumerGroup = "baeldung"; Strengadresse = "localhost: 9092"; StreamExecutionEnvironment-miljø = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, address, consumerGroup); DataStream stringInputStream = miljø .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, adresse); stringInputStream .map (new WordsCapitalizer ()) .addSink (flinkKafkaProducer); }

Søknaden vil lese data fra flink_inngang emne, utfør operasjoner på strømmen og lagre deretter resultatene i flink_output tema i Kafka.

Vi har sett hvordan vi skal håndtere strenger ved hjelp av Flink og Kafka. Men ofte er det nødvendig å utføre operasjoner på tilpassede objekter. Vi får se hvordan du gjør dette i de neste kapitlene.

7. Deserialisering av egendefinert objekt

Følgende klasse representerer en enkel melding med informasjon om avsender og mottaker:

@JsonSerialize offentlig klasse InputMessage {String avsender; Stringmottaker; LocalDateTime sentAt; Strengmelding; }

Tidligere brukte vi SimpleStringSchema å deserialisere meldinger fra Kafka, men nå vi ønsker å deserialisere data direkte til egendefinerte objekter.

For å gjøre dette trenger vi en tilpasset Deserialisering Skjema:

offentlig klasse InputMessageDeserializationSchema implementerer DeserializationSchema {statisk ObjectMapper objectMapper = ny ObjectMapper () .registerModule (ny JavaTimeModule ()); @ Override public InputMessage deserialize (byte [] bytes) kaster IOException {return objectMapper.readValue (bytes, InputMessage.class); } @Override public boolean isEndOfStream (InputMessage inputMessage) {return false; } @Override public TypeInformation getProducedType () {return TypeInformation.of (InputMessage.class); }}

Vi antar her at meldingene holdes som JSON i Kafka.

Siden vi har et felt av typen LocalDateTime, må vi spesifisere JavaTimeModule, som tar seg av kartleggingen LocalDateTime motsetter seg JSON.

Flink-skjemaer kan ikke ha felt som ikke kan serienummeres fordi alle operatører (som skjemaer eller funksjoner) serialiseres i begynnelsen av jobben.

Det er lignende problemer i Apache Spark. En av de kjente løsningene for dette problemet er å initialisere felt som statisk, som vi gjorde med ObjectMapper ovenfor. Det er ikke den peneste løsningen, men det er relativt enkelt og gjør jobben.

Metoden isEndOfStream kan brukes til det spesielle tilfellet når stream bare skal behandles til noen spesifikke data er mottatt. Men det er ikke nødvendig i vårt tilfelle.

8. Tilpasset objektserialisering

La oss anta at vi vil at systemet vårt skal ha en mulighet til å lage en sikkerhetskopi av meldinger. Vi ønsker at prosessen skal være automatisk, og hver sikkerhetskopi skal være sammensatt av meldinger sendt i løpet av en hel dag.

En backup-melding skal også ha en unik ID.

For dette formålet kan vi opprette følgende klasse:

offentlig klasse Backup {@JsonProperty ("inputMessages") Liste inputMessages; @JsonProperty ("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; public Backup (List inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

Vær oppmerksom på at UUID-genereringsmekanismen ikke er perfekt, da den tillater duplikater. Dette er imidlertid nok for omfanget av dette eksemplet.

Vi vil redde våre Sikkerhetskopiering objekt som JSON mot Kafka, så vi trenger å lage vårt Serialisering Schema:

offentlig klasse BackupSerializationSchema implementerer SerializationSchema {ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger (BackupSerializationSchema.class); @ Overstyr offentlig byte [] serialiser (Backup backupMessage) {if (objectMapper == null) {objectMapper = new ObjectMapper () .registerModule (new JavaTimeModule ()); } prøv {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } fangst (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("Kunne ikke parsere JSON", e); } returner ny byte [0]; }}

9. Tidsstempelmeldinger

Siden vi ønsker å lage en sikkerhetskopi for alle meldinger hver dag, trenger meldinger en tidsstempel.

Flink gir de tre forskjellige tidskarakteristikkene EventTime, ProcessingTime, og Svelgingstid.

I vårt tilfelle må vi bruke tidspunktet da meldingen ble sendt, så vi bruker det EventTime.

Å bruke EventTimevi trenger en TidsstempelTildeler som trekker ut tidsstempler fra inngangsdataene våre:

offentlig klasse InputMessageTimestampAssigner implementerer AssignerWithPunctuatedWatermarks {@Override public long extractTimestamp (InputMessage element, long previousElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); return element.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @Nullable @Override offentlig vannmerke checkAndGetNextWatermark (InputMessage lastElement, long extractedTimestamp) {returner nytt vannmerke (extractedTimestamp - 1500); }}

Vi må forandre vår LocalDateTime til EpochSecond da dette er formatet som Flink forventer. Etter å ha tildelt tidsstempler, vil alle tidsbaserte operasjoner bruke tid fra sentAt felt å operere.

Siden Flink forventer at tidsstemplene skal være i millisekunder og toEpochSecond () returnerer tiden i sekunder vi trengte for å multiplisere den med 1000, så Flink vil opprette vinduer riktig.

Flink definerer begrepet a Vannmerke. Vannmerker er nyttige i tilfelle data som ikke kommer i den rekkefølgen de ble sendt. Et vannmerke definerer den maksimale forsinkelsen som er tillatt for elementene som skal behandles.

Elementer som har tidsstempler lavere enn vannmerket, blir ikke behandlet i det hele tatt.

10. Opprette Time Windows

For å sikre at sikkerhetskopien vår bare samler meldinger sendt i løpet av en dag, kan vi bruke timeWindowAll metode på strømmen, som vil dele meldinger i windows.

Vi må imidlertid fortsatt samle meldinger fra hvert vindu og returnere dem som Sikkerhetskopiering.

For å gjøre dette trenger vi en tilpasset AggregateFunction:

offentlig klasse BackupAggregator implementerer AggregateFunction {@Override public List createAccumulator () {return new ArrayList (); } @ Override public List add (InputMessage inputMessage, List inputMessages) {inputMessages.add (inputMessage); return inputMessages; } @ Override public Backup getResult (List inputMessages) {return new Backup (inputMessages, LocalDateTime.now ()); } @Override public List merge (List inputMessages, List acc1) {inputMessages.addAll (acc1); return inputMessages; }}

11. Aggregerende sikkerhetskopier

Etter å ha tilordnet riktige tidsstempler og implementert våre AggregateFunction, kan vi endelig ta Kafka-innspillene våre og behandle det:

offentlig statisk tomrom createBackup () kaster Unntak {String inputTopic = "flink_input"; Streng outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment-miljø = StreamExecutionEnvironment.getExecutionEnvironment (); miljø.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (new InputMessageTimestampAssigner ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (new BackupAggregator ()) .addSink (flinkKafkaProducer); miljø. utfør (); }

12. Konklusjon

I denne artikkelen har vi presentert hvordan du lager en enkel datarørledning med Apache Flink og Apache Kafka.

Som alltid kan koden finnes på Github.


$config[zx-auto] not found$config[zx-overlay] not found