Apache Spark: Forskjeller mellom datarammer, datasett og RDD

1. Oversikt

Apache Spark er et raskt, distribuert databehandlingssystem. Det gjør databehandling i minnet og bruker hurtigbufring i minnet og optimalisert utførelse, noe som gir rask ytelse. Det gir API-er på høyt nivå for populære programmeringsspråk som Scala, Python, Java og R.

I denne raske opplæringen vil vi gå gjennom tre av Spark-grunnleggende konsepter: datarammer, datasett og RDD.

2. DataFrame

Spark SQL introduserte en tabelldataabstraksjon kalt DataFrame siden Spark 1.3. Siden den gang har det blitt en av de viktigste funksjonene i Spark. Denne API-en er nyttig når vi ønsker å håndtere strukturerte og semistrukturerte, distribuerte data.

I seksjon 3 vil vi diskutere Resilient Distributed Datasets (RDD). DataFrames lagrer data på en mer effektiv måte enn RDD, dette er fordi de bruker uforanderlige, in-memory, elastiske, distribuerte og parallelle evner til RDD, men de bruker også et skjema for dataene. DataFrames oversetter også SQL-kode til optimaliserte RDD-operasjoner på lavt nivå.

Vi kan lage DataFrames på tre måter:

  • Konvertering av eksisterende RDDer
  • Kjører SQL-spørringer
  • Laster inn eksterne data

Spark team introdusert SparkSession i versjon 2.0, forener den alle forskjellige sammenhenger og sikrer at utviklere ikke trenger å bekymre seg for å opprette forskjellige sammenhenger:

SparkSession session = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

Vi skal analysere Tourist.csv fil:

Datasettdata = dataFrameReader.option ("header", "true") .csv ("data / Tourist.csv");

Siden Spark 2.0 DataFrame ble en Datasett av typen Rad, slik at vi kan bruke en DataFrame som et alias for en Datasett.

Vi kan velge spesifikke kolonner som vi er interessert i. Vi kan også filtrere og gruppere etter en gitt kolonne:

data.select (col ("country"), col ("year"), col ("value")) .show (); data.filter (col ("country"). equalTo ("Mexico")) .show (); data.groupBy (col ("country")) .count () .show ();

3. Datasett

Et datasett er et sett med sterkt typede, strukturerte data. De gir den kjente objektorienterte programmeringsstilen pluss fordelene med typesikkerhet siden datasett kan sjekke syntaks og fange feil på kompileringstidspunktet.

Datasett er en utvidelse av DataFrame, slik at vi kan betrakte en DataFrame som en utypet visning av et datasett.

Spark-teamet ga ut Datasett API i Spark 1.6 og som de nevnte: "Målet med Spark Datasets er å tilby et API som lar brukerne enkelt uttrykke transformasjoner på objektdomener, samtidig som de gir ytelses- og robusthetsfordelene til Spark SQL-kjøringsmotoren".

Først må vi lage en klasse av typen TouristData:

offentlig klasse TouristData {privat strengregion; privat strengland; privat strengår; private String-serien; privat dobbel verdi; private String fotnoter; privat streng kilde; // ... getters og setters}

For å tilordne hver av postene våre til den spesifiserte typen, må vi bruke en koder. Kodere oversetter mellom Java-objekter og Sparks interne binære format:

// Initiering av SparkSession og datainnlasting DatasettresponsWithSelectedColumns = data.select (col ("region"), col ("country"), col ("year"), col ("series"), col ("value"). Cast ("dobbel"), kol ("fotnoter"), kol ("kilde")); Datasett typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

Som med DataFrame, kan vi filtrere og gruppere etter bestemte kolonner:

typedDataset.filter ((FilterFunction) record -> record.getCountry () .equals ("Norway")) .show (); typedDataset.groupBy (typedDataset.col ("country")) .count () .show ();

Vi kan også utføre operasjoner som å filtrere etter kolonne som samsvarer med et bestemt område eller beregne summen av en bestemt kolonne, for å få den totale verdien av den:

typedDataset.filter ((FilterFunction) post -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("spend")) .groupBy ("country") .agg (sum ("value")) .show ();

4. RDDs

Resilient Distributed Dataset eller RDD er Sparks primære programmeringsabstraksjon. Det representerer en samling av elementer som er: uforanderlig, spenstig og distribuert.

En RDD innkapsler et stort datasett, Spark distribuerer automatisk dataene som finnes i RDDer over klyngen vår og parallelliserer operasjonene vi utfører på dem.

Vi kan bare opprette RDDer gjennom drift av data i stabil lagring eller operasjoner på andre RDDer.

Feiltoleranse er viktig når vi håndterer store datasett og dataene distribueres på klyngemaskiner. RDD er elastiske på grunn av Sparks innebygde mekanisme for gjenoppretting av feil. Spark stoler på det faktum at RDDs husker hvordan de ble opprettet, slik at vi enkelt kan spore tilbake linjen for å gjenopprette partisjonen.

Det er to typer operasjoner vi kan gjøre på RDDer: Transformasjoner og handlinger.

4.1. Transformasjoner

Vi kan bruke transformasjoner til en RDD for å manipulere dataene. Etter at denne manipulasjonen er utført, får vi en helt ny RDD, siden RDD er uforanderlige gjenstander.

Vi vil sjekke hvordan du implementerer Map and Filter, to av de vanligste transformasjonene.

Først må vi lage en JavaSparkContext og last inn dataene som en RDD fra Tourist.csv fil:

SparkConf conf = new SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = ny JavaSparkContext (conf); JavaRDD-turister = sc.textFile ("data / Tourist.csv");

Deretter la oss bruke kartfunksjonen for å få navnet på landet fra hver post og konvertere navnet til store bokstaver. Vi kan lagre dette nylig genererte datasettet som en tekstfil på disken:

JavaRDD upperCaseCountries = tourist.map (linje -> {String [] kolonner = line.split (COMMA_DELIMITER); returner kolonner [1] .toUpperCase ();}). Distinkt (); upperCaseCountries.saveAsTextFile ("data / output / uppercase.txt");

Hvis vi bare vil velge et bestemt land, kan vi bruke filterfunksjonen på vår opprinnelige turist RDD:

JavaRDD-turisterInMexico = turister .filter (linje -> linje.split (COMMA_DELIMITER) [1] .ekvivalenter ("Mexico")); touristInMexico.saveAsTextFile ("data / output / touristInMexico.txt");

4.2. Handlinger

Handlinger returnerer en endelig verdi eller lagrer resultatene på platen etter å ha gjort noe beregning av dataene.

To av de gjentatte handlingene i Spark er Count and Reduce.

La oss telle de totale landene på CSV-filen vår:

// Initialisering av gnisttekst og datainnlasting av JavaRDD-land = tourist.map (linje -> {String [] kolonner = line.split (COMMA_DELIMITER); retur kolonner [1];}). Distinkt (); Long numberOfCountries = country.count ();

Nå beregner vi de totale utgiftene etter land. Vi må filtrere postene som inneholder utgifter i beskrivelsen.

I stedet for å bruke en JavaRDD, vi bruker en JavaPairRDD. Et par RDD er en type RDD som kan lagre nøkkelverdipar. La oss sjekke det neste:

JavaRDD-turisterExpenditure = turister .filter (linje -> linje.split (COMMA_DELIMITER) [3]. Inneholder ("utgifter")); JavaPairRDD spendPairRdd = touristExpenditure .mapToPair (line -> {String [] columns = line.split (COMMA_DELIMITER); returner ny Tuple2 (kolonner [1], Double.valueOf (kolonner [6]));}); Liste totalByCountry = spendPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Konklusjon

For å oppsummere, bør vi bruke DataFrames eller datasett når vi trenger domenespesifikke API-er, vi trenger høyt nivå uttrykk som aggregering, sum eller SQL-spørsmål. Eller når vi ønsker typesikkerhet på kompileringstidspunktet.

På den annen side bør vi bruke RDD når data er ustrukturert og vi ikke trenger å implementere et bestemt skjema eller når vi trenger transformasjoner og handlinger på lavt nivå.

Som alltid er alle kodeeksemplene tilgjengelige på GitHub.