En guide til Apache Crunch

1. Introduksjon

I denne opplæringen vil vi demonstrere Apache Crunch med et eksempel på databehandlingsapplikasjon. Vi kjører dette programmet ved hjelp av MapReduce-rammeverket.

Vi begynner med å kort dekke noen Apache Crunch-konsepter. Så hopper vi inn i en prøve-app. I denne appen gjør vi tekstbehandling:

  • Først og fremst vil vi lese linjene fra en tekstfil
  • Senere deler vi dem i ord og fjerner noen vanlige ord
  • Deretter grupperer vi de resterende ordene for å få en liste over unike ord og deres antall
  • Til slutt vil vi skrive denne listen til en tekstfil

2. Hva er knase?

MapReduce er et distribuert, parallelt programmeringsrammeverk for behandling av store datamengder på en klynge med servere. Programvarerammer som Hadoop og Spark implementerer MapReduce.

Crunch gir et rammeverk for å skrive, teste og kjøre MapReduce-rørledninger i Java. Her skriver vi ikke MapReduce-jobbene direkte. Snarere definerer vi datarørledningen (dvs. operasjonene for å utføre trinn for inndata, prosessering og utgang) ved hjelp av Crunch API. Crunch Planner tilordner dem til MapReduce-jobbene og utfører dem når det er nødvendig.

Derfor koordineres hver Crunch-datarørledning av en forekomst av Rørledning grensesnitt. Dette grensesnittet definerer også metoder for å lese data inn i en rørledning via Kilde forekomster og skrive data ut fra en rørledning til Mål tilfeller.

Vi har 3 grensesnitt for å representere data:

  1. PC-samling - en uforanderlig, distribuert samling av elementer
  2. PTable<>, V> - et uforanderlig, distribuert, uordnet multikart over nøkler og verdier
  3. PGroupedTable<>, V> - et distribuert, sortert kart over nøkler av type K til en Iterabel V som kan gjentas over nøyaktig en gang

DoFn er basisklassen for alle databehandlingsfunksjoner. Det tilsvarer Kartlegger, Reduksjon og Combiner klasser i MapReduce. Vi bruker mesteparten av utviklingen på å skrive og teste logiske beregninger ved hjelp av den.

Nå som vi er mer kjent med Crunch, la oss bruke den til å bygge eksemplet på applikasjonen.

3. Sette opp et Crunch-prosjekt

Først av alt, la oss sette opp et Crunch Project med Maven. Vi kan gjøre det på to måter:

  1. Legg til de nødvendige avhengighetene i pom.xml filen til et eksisterende prosjekt
  2. Bruk en arketype for å generere et startprosjekt

La oss se raskt på begge tilnærminger.

3.1. Maven avhengigheter

For å legge Crunch til et eksisterende prosjekt, la oss legge til de nødvendige avhengighetene i pom.xml fil.

La oss først legge til knase-kjerne bibliotek:

 org.apache.crunch crunch-core 0.15.0 

La oss deretter legge til hadoop-klient bibliotek for å kommunisere med Hadoop. Vi bruker versjonen som samsvarer med Hadoop-installasjonen:

 org.apache.hadoop hadoop-client 2.2.0 levert 

Vi kan sjekke Maven Central for de nyeste versjonene av crunch-core og hadoop-client-biblioteker.

3.2. Maven Archetype

En annen tilnærming er å raskt generere et startprosjekt ved hjelp av Maven-arketypen levert av Crunch:

mvn arketype: generer -Dfilter = org.apache.crunch: crunch-arketype 

Når du blir bedt om det av kommandoen ovenfor, gir vi Crunch-versjonen og prosjektgjenstandsdetaljene.

4. Crunch Pipeline Setup

Etter å ha satt opp prosjektet, må vi lage et Rørledning gjenstand. Crunch har 3 Rørledning implementeringer:

  • MRPipeline - utfører innen Hadoop MapReduce
  • SparkPipeline - utføres som en serie Spark-rørledninger
  • MemPipeline - kjører i minnet på klienten og er nyttig for enhetstesting

Vanligvis utvikler og tester vi ved hjelp av en forekomst av MemPipeline. Senere bruker vi en forekomst av MRPipeline eller SparkPipeline for faktisk utførelse.

Hvis vi trengte en rørledning i minnet, kunne vi bruke den statiske metoden getInstance å få MemPipeline forekomst:

Pipeline pipeline = MemPipeline.getInstance ();

Men foreløpig, la oss lage en forekomst av MRPipeline for å utføre søknaden med Hadoop:

Rørledning = ny MRPipeline (WordCount.class, getConf ());

5. Les inndata

Etter å ha opprettet rørledningsobjektet, vil vi lese inndata. De Rørledning grensesnittet gir en praktisk metode for å lese inndata fra en tekstfil, readTextFile (pathName).

La oss kalle denne metoden for å lese tekstfilen:

PCollection linjer = pipeline.readTextFile (inputPath);

Ovennevnte kode leser tekstfilen som en samling av String.

Som neste trinn, la oss skrive en testtilfelle for lesing av input:

@Test offentlig ugyldig givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection linjer = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .størrelse ()); }

I denne testen bekrefter vi at vi får forventet antall linjer når vi leser en tekstfil.

6. Trinn for databehandling

Etter å ha lest inngangsdataene, må vi behandle dem. Crunch API inneholder et antall underklasser av DoFn å håndtere vanlige databehandlingsscenarier:

  • FilterFn - filtrerer medlemmer av en samling basert på en boolsk tilstand
  • MapFn - tilordner hver inngangspost til nøyaktig en utgangspost
  • CombineFn - kombinerer et antall verdier til en enkelt verdi
  • Bli medFn - utfører sammenføyninger som indre sammenføyning, venstre ytre sammenføyning, høyre ytre sammenføyning og full ytre sammenføyning

La oss implementere følgende databehandlingslogikk ved å bruke disse klassene:

  1. Del hver linje i inndatafilen i ord
  2. Fjern stoppordene
  3. Tell de unike ordene

6.1. Del en linje med tekst i ord

Først av alt, la oss lage Tokenizer klasse for å dele en linje i ord.

Vi utvider DoFn klasse. Denne klassen har en abstrakt metode som kalles prosess. Denne metoden behandler inngangspostene fra a PC-samling og sender utgangen til en Emitter.

Vi må implementere splitterlogikken i denne metoden:

offentlig klasse Tokenizer utvider DoFn {privat statisk slutt Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @ Overstyr offentlig ugyldig prosess (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}} 

I implementeringen ovenfor har vi brukt Splitter klasse fra Guava-biblioteket for å trekke ut ord fra en linje.

Deretter la oss skrive en enhetstest for Tokenizer klasse:

@RunWith (MockitoJUnitRunner.class) offentlig klasse TokenizerUnitTest {@Mock private Emitter emitter; @Test offentlig ugyldig givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEitted () {Tokenizer splitter = new Tokenizer (); splitter.process ("hei verden", emitter); verifisere (emitter) .emit ("hei"); verifisere (emitter) .emit ("verden"); verifisereNoMoreInteractions (emitter); }}

Ovenstående test verifiserer at de riktige ordene returneres.

Til slutt, la oss dele linjene som er lest fra inngangstekstfilen ved hjelp av denne klassen.

De parallelDo Metode av PC-samling grensesnittet gjelder det gitte DoFn til alle elementene og returnerer et nytt PC-samling.

La oss kalle denne metoden på linjesamlingen og passere en forekomst av Tokenizer:

PC-samleord = lines.parallelDo (ny Tokenizer (), Writables.strings ()); 

Som et resultat får vi listen over ord i tekstfilen. Vi fjerner stoppordene i neste trinn.

6.2. Fjern stoppord

På samme måte som forrige trinn, la oss lage en StopWordFilter klasse for å filtrere ut stoppord.

Imidlertid vil vi utvide FilterFn i stedet for DoFn. FilterFn har en abstrakt metode kalt aksepterer. Vi må implementere filtreringslogikken i denne metoden:

offentlig klasse StopWordFilter utvider FilterFn {// Engelske stoppord, lånt fra Lucene. privat statisk slutt Sett STOP_WORDS = ImmutableSet .copyOf (new String [] {"a", "and", "are", "as", "at", "be", "but", "by", "for" , "hvis", "i", "inn", "er", "det", "nei", "ikke", "av", "på", "eller", "s", "slik", " t "," that "," the "," their "," then "," there "," these "," they "," this "," to "," was "," will "," with " }); @ Override public boolean accept (String word) {return! STOP_WORDS.contains (word); }}

Deretter la oss skrive enhetstesten for StopWordFilter klasse:

offentlig klasse StopWordFilterUnitTest {@Test offentlig ugyldighet givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("the")); assertFalse (filter.accept ("a")); } @Test offentlig ugyldig givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = new StopWordFilter (); assertTrue (filter.accept ("Hello")); assertTrue (filter.accept ("World")); } @Test offentlig ugyldig gittWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("This", "is", "a", "test", "phrase"); PCollection noStopWords = ord.filter (nytt StopWordFilter ()); assertEquals (ImmutableList.of ("This", "test", "setning"), Lists.newArrayList (noStopWords.materialize ())); }}

Denne testen bekrefter at filtreringslogikken utføres riktig.

Til slutt, la oss bruke StopWordFilter for å filtrere listen over ord generert i forrige trinn. De filter Metode av PC-samling grensesnittet gjelder det gitte FilterFn til alle elementene og returnerer en ny PC-samling.

La oss kalle denne metoden på ordsamlingen og passere en forekomst av StopWordFilter:

PCollection noStopWords = ord.filter (nytt StopWordFilter ());

Som et resultat får vi den filtrerte ordsamlingen.

6.3. Tell unike ord

Etter å ha fått den filtrerte ordsamlingen, vil vi telle hvor ofte hvert ord forekommer. PC-samling grensesnittet har en rekke metoder for å utføre vanlige aggregeringer:

  • min - returnerer minimumselementet i samlingen
  • maks - returnerer det maksimale elementet i samlingen
  • lengde - returnerer antall elementer i samlingen
  • telle - returnerer a PTable som inneholder antallet av hvert unike element i samlingen

La oss bruke telle metode for å få de unike ordene sammen med deres tellinger:

// Count-metoden bruker en serie Crunch-primitiver og returnerer // et kart over de unike ordene i inngangspCollection til deres tellinger. PTable teller = noStopWords.count ();

7. Spesifiser utdata

Som et resultat av de forrige trinnene har vi en ordtabell og deres antall. Vi vil skrive dette resultatet til en tekstfil. De Rørledning grensesnitt gir praktiske metoder for å skrive utdata:

ugyldig skriving (PC-samlingssamling, målmål); ugyldig skriving (PCollection samling, Target target, Target.WriteMode writeMode); void writeTextFile (PCollection collection, String pathName);

La oss derfor kalle writeTextFile metode:

pipeline.writeTextFile (teller, outputPath); 

8. Administrer utførelse av rørledninger

Alle trinnene så langt har nettopp definert datarørledningen. Ingen innspill er lest eller behandlet. Dette er fordi Crunch bruker lat utførelsesmodell.

Det kjører ikke MapReduce-jobbene før en metode som styrer jobbplanlegging og utførelse påkalles på rørledningsgrensesnittet:

  • løpe - utarbeider en utførelsesplan for å lage de nødvendige utgangene og utfører den deretter synkront
  • ferdig - kjører eventuelle gjenværende jobber som kreves for å generere utdata, og rydder opp eventuelle mellomliggende datafiler som er opprettet
  • runAsync - ligner på kjøremetoden, men kjøres på en ikke-blokkerende måte

La oss derfor kalle ferdig metode for å utføre rørledningen som MapReduce-jobber:

PipelineResult result = pipeline.done (); 

Ovennevnte uttalelse kjører MapReduce-jobbene for å lese inndata, behandle dem og skrive resultatet til utdatakatalogen.

9. Sette rørledningen sammen

Så langt har vi utviklet og enhetstest logikken for å lese inndata, behandle den og skrive til utdatafilen.

Deretter la oss sette dem sammen for å bygge hele datarørledningen:

public int run (String [] args) kaster Unntak {String inputPath = args [0]; Streng outputPath = args [1]; // Opprett et objekt for å koordinere oppretting og utføring av rørledninger. Rørledning = ny MRPipeline (WordCount.class, getConf ()); // Henvis til en gitt tekstfil som en samling av strenger. PCollection linjer = pipeline.readTextFile (inputPath); // Definer en funksjon som deler hver linje i en PC-samling av strenger i // en PC-samling som består av de enkelte ordene i filen. // Det andre argumentet angir serieiseringsformatet. PC-samleord = lines.parallelDo (ny Tokenizer (), Writables.strings ()); // Ta ordsamlingen og fjern kjente stoppord. PCollection noStopWords = ord.filter (nytt StopWordFilter ()); // Tellemetoden bruker en serie Crunch-primitiver og returnerer // et kart over de unike ordene i inngangspCollection til deres tellinger. PTable teller = noStopWords.count (); // Be rørledningen om å skrive de resulterende tellingene til en tekstfil. pipeline.writeTextFile (teller, outputPath); // Utfør rørledningen som en MapReduce. PipelineResult result = pipeline.done (); return result.succeeded ()? 0: 1; }

10. Konfigurasjon av Hadoop Launch

Datarørledningen er dermed klar.

Vi trenger imidlertid koden for å starte den. La oss derfor skrive hoved- metode for å starte applikasjonen:

offentlig klasse WordCount utvider konfigurert redskapsverktøy {offentlig statisk ugyldig hoved (String [] args) kaster Unntak {ToolRunner.run (ny konfigurasjon (), ny WordCount (), args); }

ToolRunner.run analyserer Hadoop-konfigurasjonen fra kommandolinjen og utfører MapReduce-jobben.

11. Kjør applikasjon

Den komplette søknaden er nå klar. La oss kjøre følgende kommando for å bygge den:

mvn-pakke 

Som et resultat av kommandoen ovenfor får vi den pakkede applikasjonen og en spesiell jobbkrukke i målkatalogen.

La oss bruke denne jobbburken til å utføre applikasjonen på Hadoop:

hadoop jar target / crunch-1.0-SNAPSHOT-job.jar 

Programmet leser inndatafilen og skriver resultatet til utdatafilen. Utdatafilen inneholder unike ord sammen med antallet som ligner på følgende:

[Legg til, 1] [Lagt til, 1] [Beundring, 1] [Innrømmelse, 1] [Tillatelse, 1]

I tillegg til Hadoop kan vi kjøre applikasjonen innen IDE, som et frittstående program eller som enhetstester.

12. Konklusjon

I denne opplæringen opprettet vi et databehandlingsprogram som kjører på MapReduce. Apache Crunch gjør det enkelt å skrive, teste og kjøre MapReduce-rørledninger i Java.

Som vanlig finner du full kildekode på Github.


$config[zx-auto] not found$config[zx-overlay] not found