Introduksjon til Apache Flink med Java

1. Oversikt

Apache Flink er et Big Data-behandlingsrammeverk som lar programmerere behandle den store mengden data på en veldig effektiv og skalerbar måte.

I denne artikkelen vil vi introdusere noen av kjerne API-konsepter og standard datatransformasjoner tilgjengelig i Apache Flink Java API. Den flytende stilen til denne API-en gjør det enkelt å jobbe med Flinks sentrale konstruksjon - den distribuerte samlingen.

Først vil vi ta en titt på Flinks Datasett API-transformasjoner og bruk dem til å implementere et ordtellingsprogram. Så vil vi ta en kort titt på Flinks Data strøm API, som lar deg behandle strømmer av hendelser i sanntid.

2. Maven avhengighet

For å komme i gang må vi legge til Maven-avhengigheter til flink-java og flink-test-verktøy biblioteker:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Core API-konsepter

Når vi jobber med Flink, må vi vite noen ting relatert til API-en:

  • Hvert Flink-program utfører transformasjoner på distribuerte datasamlinger. Det tilbys en rekke funksjoner for transformering av data, inkludert filtrering, kartlegging, sammenføyning, gruppering og aggregering
  • EN synke operasjon i Flink utløser kjøringen av en strøm for å gi ønsket resultat av programmet, for eksempel å lagre resultatet i filsystemet eller skrive det ut til standard utdata
  • Flinktransformasjoner er lat, noe som betyr at de ikke blir utført før en synke operasjonen påberopes
  • Apache Flink API støtter to operasjonsmåter - batch og sanntid. Hvis du har å gjøre med en begrenset datakilde som kan behandles i batch-modus, vil du bruke Datasett API. Hvis du vil behandle ubegrensede datastrømmer i sanntid, må du bruke Data strøm API

4. DataSet API-transformasjoner

Inngangspunktet til Flink-programmet er en forekomst av Utførelse Miljø klasse - dette definerer konteksten der et program kjøres.

La oss lage en Utførelse Miljø for å starte behandlingen:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Merk at når du starter applikasjonen på den lokale maskinen, vil den utføre behandling på den lokale JVM. Hvis du vil begynne å behandle på en klynge av maskiner, må du installere Apache Flink på disse maskinene og konfigurere Utførelse Miljø tilsvarende.

4.1. Opprette et datasett

For å begynne å utføre datatransformasjoner, må vi forsyne programmet vårt med dataene.

La oss lage en forekomst av Datasett klasse ved hjelp av vår Utførelsesmiljø:

DataSet beløp = env.fromElements (1, 29, 40, 50);

Du kan opprette en Datasett fra flere kilder, for eksempel Apache Kafka, en CSV, fil eller praktisk talt hvilken som helst annen datakilde.

4.2. Filtrer og reduser

Når du oppretter en forekomst av Datasett klasse, kan du bruke transformasjoner på den.

La oss si at du vil filtrere tall som er over en viss terskel og deretter summere dem alle. Du kan bruke filter() og redusere() transformasjoner for å oppnå dette:

int terskel = 30; Liste samle = beløp .filter (a -> a> terskel). Redusere ((heltall, t1) -> heltall + t1). Samle (); assertThat (collect.get (0)). isEqualTo (90); 

Merk at samle inn() metoden er en synke operasjon som utløser de faktiske datatransformasjonene.

4.3. Kart

La oss si at du har en Datasett av Person gjenstander:

privat statisk klasse Person {privat int alder; privat strengnavn; // standard konstruktører / getters / setters}

La oss deretter lage en Datasett av disse gjenstandene:

DataSet personDataSource = env.fromCollection (Arrays.asList (ny person (23, "Tom"), ny person (75, "Michael")));

Anta at du bare vil trekke ut alder felt fra hvert objekt i samlingen. Du kan bruke kart() transformasjon for å få bare et bestemt felt av Person klasse:

Liste aldre = personDataSource .map (p -> p.age) .collect (); assertThat (aldre) .hasSize (2); hevder at (aldre). inneholder (23, 75);

4.4. Bli med

Når du har to datasett, kan det være lurt å bli med dem på noen id felt. For dette kan du bruke bli med() transformasjon.

La oss lage samlinger av transaksjoner og adresser til en bruker:

Tuple3-adresse = ny Tuple3 (1, "5th Avenue", "London"); Datasett adresser = env.fromElements (adresse); Tuple2 firstTransaction = ny Tuple2 (1, "Transaction_1"); Datasett transaksjoner = env.fromElements (firstTransaction, new Tuple2 (12, "Transaction_2")); 

Det første feltet i begge tuplene er av en Heltall type, og dette er en id felt som vi ønsker å bli med på begge datasettene på.

For å utføre den faktiske tilknytningslogikken, må vi implementere en KeySelector grensesnitt for adresse og transaksjon:

privat statisk klasse IdKeySelectorTransaction implementerer KeySelector {@Override public Integer getKey (Tuple2 value) {returverdi.f0; }} privat statisk klasse IdKeySelectorAddress implementerer KeySelector {@Override public Integer getKey (Tuple3 value) {returverdi.f0; }}

Hver velger returnerer bare feltet som sammenføyningen skal utføres på.

Dessverre er det ikke mulig å bruke lambdauttrykk her fordi Flink trenger generisk typeinformasjon.

Deretter la oss implementere sammenslåingslogikk ved hjelp av disse velgerne:

Liste<>> sammenføyde = transaksjoner.forenes (adresser). hvor (ny IdKeySelectorTransaction ()) .equalTo (ny IdKeySelectorAddress ()) .collect (); assertThat (joined) .hasSize (1); assertThat (joined) .contains (new Tuple2 (firstTransaction, address)); 

4.5. Sortere

La oss si at du har følgende samling av Tuple2:

Tuple2 secondPerson = ny Tuple2 (4, "Tom"); Tuple2 thirdPerson = ny Tuple2 (5, "Scott"); Tuple2 quarterPerson = ny Tuple2 (200, "Michael"); Tuple2 firstPerson = ny Tuple2 (1, "Jack"); Datasett transaksjoner = env.fromElements (quarterPerson, secondPerson, thirdPerson, firstPerson); 

Hvis du vil sortere denne samlingen etter det første feltet i tupelen, kan du bruke sortPartitions () transformasjon:

Liste sortert = transaksjoner .sortPartition (ny IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (sortert) .containsExactly (firstPerson, secondPerson, thirdPerson, 4thPerson);

5. Antall ord

Ordtellingsproblemet er et som ofte brukes til å vise frem funksjonene til Big Data-behandlingsrammer. Den grunnleggende løsningen innebærer å telle ordforekomster i en tekstinntasting. La oss bruke Flink til å implementere en løsning på dette problemet.

Som det første trinnet i løsningen, lager vi en LineSplitter klasse som deler innspillene våre i tokens (ord), som samler for hvert token a Tuple2 av nøkkelverdipar. I hver av disse tuplene er nøkkelen et ord som finnes i teksten, og verdien er heltallet (1).

Denne klassen implementerer FlatMapFunction grensesnitt som tar String som innspill og produserer en Tuple2:

offentlig klasse LineSplitter implementerer FlatMapFunction {@Override public void flatMap (Strengverdi, samler ut) {Stream.of (value.toLowerCase (). split ("\ W +")). filter (t -> t.length ()> 0) .forEach (token -> out.collect (new Tuple2 (token) , 1))); }}

Vi kaller samle inn() metoden på Samler klasse for å skyve data fremover i prosessrørledningen.

Vårt neste og siste trinn er å gruppere tuplene etter deres første elementer (ord) og deretter utføre en sum samlet på de andre elementene for å produsere en telling av ordforekomster:

offentlig statisk datasett startWordCount (ExecutionEnvironment env, List lines) kaster Unntak {DataSet text = env.fromCollection (linjer); return text.flatMap (new LineSplitter ()) .groupBy (0) .aggregate (Aggregations.SUM, 1); }

Vi bruker tre typer Flink-transformasjoner: flatMap (), gruppe av(), og aggregat ().

La oss skrive en test for å hevde at implementeringen av ordtelling fungerer som forventet:

Liste linjer = Arrays.asList ("Dette er en første setning", "Dette er en annen setning med ett ord"); Datasett resultat = WordCount.startWordCount (env, linjer); Liste samle = resultat. samle (); assertThat (collect) .containsExactlyInAnyOrder (ny Tuple2 ("a", 3), ny Tuple2 ("setning", 2), ny Tuple2 ("ord", 1), ny Tuple2 ("er", 2), ny Tuple2 ( "denne", 2), nye Tuple2 ("andre", 1), nye Tuple2 ("første", 1), nye Tuple2 ("med", 1), nye Tuple2 ("en", 1));

6. DataStream API

6.1. Opprette en DataStream

Apache Flink støtter også behandling av strømmer av hendelser gjennom sin DataStream API. Hvis vi vil begynne å konsumere hendelser, må vi først bruke StreamExecutionEnvironment klasse:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Deretter kan vi lage en strøm av hendelser ved hjelp av utførelse Miljø fra en rekke kilder. Det kan være en beskjedbuss som Apache Kafka, men i dette eksemplet vil vi ganske enkelt opprette en kilde fra et par strengelementer:

DataStream dataStream = executionEnvironment.fromElements ("Dette er en første setning", "Dette er en annen setning med ett ord");

Vi kan bruke transformasjoner på hvert element i Data strøm som i det normale Datasett klasse:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

For å utløse kjøringen, må vi påberope en vaskeoperasjon som skrive ut() som bare vil skrive ut resultatet av transformasjoner til standardutgangen, etter med henrette() metoden på StreamExecutionEnvironment klasse:

upperCase.print (); env.execute ();

Det vil gi følgende utdata:

1> DETTE ER EN FØRSTE SINN 2> DETTE ER EN ANDEN SENNING MED ET ET ORD

6.2. Windowing av hendelser

Når du behandler en strøm av hendelser i sanntid, kan det hende at du trenger å gruppere hendelser sammen og bruke litt beregning på et vindu av disse hendelsene.

Anta at vi har en strøm av hendelser, der hver hendelse er et par som består av hendelsesnummeret og tidsstempelet da hendelsen ble sendt til systemet vårt, og at vi tåler hendelser som ikke er i ordre, men bare hvis de ikke er mer enn tjue sekunder for sent.

For dette eksemplet, la oss først lage en strøm som simulerer to hendelser som er flere minutter fra hverandre, og definere en tidsstempelutpakker som spesifiserer forsinkelsesterskelen:

SingleOutputStreamOperator windowed = env.fromElements (new Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), new Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Deretter la oss definere en vinduoperasjon for å gruppere hendelsene våre i fem sekunders vinduer og bruke en transformasjon på disse hendelsene:

SingleOutputStreamOperator redusert = vindusvindu. vinduAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); redusert.avtrykk ();

Det får det siste elementet i hvert fem sekunders vindu, så det skrives ut:

1> (15,1491221519)

Merk at vi ikke ser den andre hendelsen fordi den ankom senere enn den angitte forsinkelsesterskelen.

7. Konklusjon

I denne artikkelen introduserte vi Apache Flink-rammeverket og så på noen av transformasjonene som ble levert med API-en.

Vi implementerte et ordtellingsprogram ved hjelp av Flinks flytende og funksjonelle DataSet API. Så så vi på DataStream API og implementerte en enkel sanntidstransformasjon på en strøm av hendelser.

Implementeringen av alle disse eksemplene og kodebitene finner du på GitHub - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found