ETL med Spring Cloud Data Flow

1. Oversikt

Spring Cloud Data Flow er et skybasert verktøykasse for å bygge datarørledninger og batchprosesser i sanntid. Spring Cloud Data Flow er klar til å brukes til en rekke bruksområder for databehandling som enkel import / eksport, ETL-behandling, streaming av hendelser og prediktiv analyse.

I denne opplæringen lærer vi et eksempel på sanntids Extract Transform and Load (ETL) ved hjelp av en strømrørledning som trekker ut data fra en JDBC-database, transformerer den til enkle POJOer og laster den inn i en MongoDB.

2. ETL og Event-Stream Processing

ETL - pakke ut, transformere og laste - ble ofte referert til som en prosess som batch-laster data fra flere databaser og systemer til et felles datalager. I dette datalageret er det mulig å utføre tung dataanalysebehandling uten å gå på kompromiss med systemets totale ytelse.

Imidlertid endrer nye trender måten dette gjøres på. ETL har fremdeles en rolle i overføring av data til datalager og datasjøer.

I dag kan dette gjøres med strømmer i en event-stream-arkitektur ved hjelp av Spring Cloud Data Flow.

3. Spring Cloud Data Flow

Med Spring Cloud Data Flow (SCDF) kan utviklere lage datarørledninger i to smaker:

  • Langvarige strømapplikasjoner i sanntid ved hjelp av Spring Cloud Stream
  • Kortvarige batch-oppgaveapplikasjoner ved bruk av Spring Cloud Task

I denne artikkelen vil vi dekke den første, en langvarig streamingapplikasjon basert på Spring Cloud Stream.

3.1. Spring Cloud Stream-applikasjoner

SCDF Stream-rørledninger består av trinn, hvorhvert trinn er en applikasjon bygget i Spring Boot-stil ved hjelp av Spring Cloud Stream mikro-rammeverk. Disse applikasjonene er integrert av en meldingsprogramvare som Apache Kafka eller RabbitMQ.

Disse applikasjonene er klassifisert i kilder, prosessorer og vasker. Sammenlignet med ETL-prosessen, kan vi si at kilden er "ekstraktet", prosessoren er "transformatoren" og vasken er "last" -delen.

I noen tilfeller kan vi bruke en applikasjonsstarter i ett eller flere trinn i rørledningen. Dette betyr at vi ikke trenger å implementere en ny applikasjon for et trinn, men i stedet konfigurere en eksisterende applikasjonsstarter som allerede er implementert.

En liste over applikasjonsstartere finner du her.

3.2. Spring Cloud Data Flow Server

Den siste delen av arkitekturen er Spring Cloud Data Flow Server. SCDF-serveren distribuerer applikasjonene og rørledningsstrømmen ved hjelp av Spring Cloud Deployer Specification. Denne spesifikasjonen støtter SCDF cloud-native smaken ved å distribuere til en rekke moderne kjøretider, som Kubernetes, Apache Mesos, Garn og Cloud Foundry.

Vi kan også kjøre strømmen som en lokal distribusjon.

Mer informasjon om SCDF-arkitekturen finner du her.

4. Miljøoppsett

Før vi begynner, må vi velg delene av denne komplekse distribusjonen. Den første delen å definere er SCDF Server.

For testing, Vi bruker SCDF Server Local for lokal utvikling. For distribusjonen av produksjonen kan vi senere velge en skyinnfødt kjøretid, som SCDF Server Kubernetes. Vi finner listen over serverdriftstider her.

La oss nå sjekke systemkravene for å kjøre denne serveren.

4.1. Systemkrav

For å kjøre SCDF-serveren, må vi definere og sette opp to avhengigheter:

  • meldingsprogramvaren, og
  • RDBMS.

For meldingsprogramvare, vi vil jobbe med RabbitMQ, og vi velger PostgreSQL som RDBMS for lagring av definisjoner for rørledningsstrømmer.

For å kjøre RabbitMQ, last ned den nyeste versjonen her og start en RabbitMQ-forekomst ved hjelp av standardkonfigurasjonen, eller kjør følgende Docker-kommando:

docker run - navn dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-management

Som det siste installasjonstrinnet, installer og kjør PostgreSQL RDBMS på standardporten 5432. Etter dette, opprett en database der SCDF kan lagre strømdefinisjoner ved hjelp av følgende skript:

OPPRETT DATABASE dataflyt;

4.2. Spring Cloud Data Flow Server Local

For å kjøre SCDF Server Local, kan vi velge å starte serveren ved hjelp av docker-compose, eller vi kan starte det som et Java-program.

Her kjører vi SCDF Server Local som et Java-program. For å konfigurere applikasjonen må vi definere konfigurasjonen som Java-applikasjonsparametere. Vi trenger Java 8 i systemstien.

For å være vert for glassene og avhengighetene, må vi opprette en hjemmemappe for SCDF-serveren vår og laste ned SCDF Server Local-distribusjonen i denne mappen. Du kan laste ned den siste distribusjonen av SCDF Server Local her.

Vi må også lage en lib-mappe og sette en JDBC-driver der. Den siste versjonen av PostgreSQL-driveren er tilgjengelig her.

Til slutt, la oss kjøre den lokale SCDF-serveren:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = gjest \ --spring.rabbitmq.password = gjest

Vi kan sjekke om den kjører ved å se på denne URL:

// localhost: 9393 / dashbord

4.3. Spring Cloud Data Flow Shell

SCDF-skallet er et kommandolinjeverktøy som gjør det enkelt å komponere og distribuere applikasjoner og rørledninger. Disse shell-kommandoene kjøres over Spring Cloud Data Flow Server REST API.

Last ned den nyeste versjonen av krukken til SCDF-hjemmemappen din, tilgjengelig her. Når det er gjort, kjør følgende kommando (oppdater versjonen etter behov):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | '_ \ / _` | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _ ` | __ / _` | | | _ | | / _ \ \ / \ / \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Velkommen til Spring Cloud Data Flow shell. For hjelp, trykk TAB eller skriv "help". dataflyt:>

Hvis i stedet for “dataflyt:> ” du får "server-ukjent:> ” på den siste linjen kjører du ikke SCDF-serveren på localhost. I dette tilfellet kjører du følgende kommando for å koble til en annen vert:

server-ukjent:> konfigurasjonsserver for dataflyt // {host}

Nå er Shell koblet til SCDF-serveren, og vi kan kjøre kommandoene våre.

Det første vi må gjøre i Shell er å importere applikasjonsstarterne. Finn den nyeste versjonen her for RabbitMQ + Maven i Spring Boot 2.0.x, og kjør følgende kommando (igjen, oppdater versjonen, her “Darwin-SR1", etter behov):

$ dataflow:> appimport --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

For å sjekke de installerte applikasjonene kjører du følgende Shell-kommando:

$ dataflyt:> appliste

Som et resultat bør vi se en tabell som inneholder alle de installerte applikasjonene.

SCDF tilbyr også et grafisk grensesnitt, kalt Flo, som vi kan få tilgang til via denne adressen: // localhost: 9393 / dashbord. Imidlertid er bruken ikke innenfor rammen av denne artikkelen.

5. Å komponere en ETL-rørledning

La oss nå lage strømrørledningen vår. For å gjøre dette bruker vi JDBC Source-applikasjonsstarter for å hente ut informasjon fra vår relasjonsdatabase.

Vi oppretter også en tilpasset prosessor for å transformere informasjonsstrukturen og en tilpasset vask for å laste dataene våre til en MongoDB.

5.1. Extract - Klargjøre en relasjonsdatabase for utvinning

La oss lage en database med navnet crm og et bord med navnet på kunde:

OPPRETT DATABASIS crm;
OPPRETT TABELL-kunde (id bigint IKKE NULL, importert boolsk DEFAULT falsk, kundenavn tegn varierer (50), PRIMÆR NØKKEL (id))

Merk at vi bruker et flagg importert, som lagrer hvilken post som allerede er importert. Vi kan også lagre denne informasjonen i en annen tabell, om nødvendig.

La oss nå sette inn noen data:

INSERT INTO customer (id, customer_name, importert) VALUES (1, 'John Doe', false);

5.2. Transform - Kartlegging JDBC Felter til MongoDB Feltstruktur

For transformasjonstrinnet vil vi gjøre en enkel oversettelse av feltet Kundenavn fra kildetabellen til et nytt felt Navn. Andre transformasjoner kan gjøres her, men la oss holde eksemplet kort.

For å gjøre dette oppretter vi et nytt prosjekt med navnet kundetransformere. Den enkleste måten å gjøre dette på er å bruke Spring Initializr-nettstedet til å lage prosjektet. Når du har kommet til nettstedet, velger du et gruppe- og et gjenstandsnavn. Vi bruker com.kunde og kundetransformasjon, henholdsvis.

Når dette er gjort, klikker du på knappen "Generer prosjekt" for å laste ned prosjektet. Pakk deretter ut prosjektet og importer det til din favoritt IDE, og legg til følgende avhengighet til pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Nå er vi klar til å begynne å kode feltnavnkonvertering. For å gjøre dette, oppretter vi Kunde klasse for å fungere som adapter. Denne klassen vil motta Kundenavn via setName () metode og vil sende ut verdien via getName metode.

De @JsonProperty merknader vil gjøre transformasjonen mens deerialisering fra JSON til Java:

offentlig klasse kunde {privat Lang id; privat strengnavn; @JsonProperty ("customer_name") public void setName (strengnavn) {this.name = name; } @JsonProperty ("navn") offentlig String getName () {returnavn; } // Getters og Setters}

Prosessoren trenger å motta data fra en inngang, utføre transformasjonen og binde utfallet til en utgangskanal. La oss lage en klasse for å gjøre dette:

importer org.springframework.cloud.stream.annotation.EnableBinding; importer org.springframework.cloud.stream.messaging.Processor; importer org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) offentlig klasse CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) offentlig kunde convertToPojo (Kundens nyttelast) {retur nyttelast; }}

I koden ovenfor kan vi observere at transformasjonen skjer automatisk. Inndata mottar dataene når JSON og Jackson deserialiserer dem til en Kunde objektet ved hjelp av sett metoder.

Det motsatte er for utdataene, dataene serialiseres til JSON ved hjelp av metoder.

5.3. Last - Vask i MongoDB

På samme måte som transformasjonstrinnet, vi lager et nytt maven-prosjekt, nå med navnet kunde-mongodb-synke. Igjen, få tilgang til Spring Initializr, for gruppen velger com.kunde, og velg Artifact kunde-mongodb-vask. Skriv deretter inn MongoDB i avhengighets søkeboksen og laste ned prosjektet.

Deretter pakker du ut og importerer det til din favoritt IDE.

Deretter legger du til samme ekstra avhengighet som i kundetransformere prosjekt.

Nå skal vi lage en til Kunde klasse, for å motta innspill i dette trinnet:

importer org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "customer") offentlig klasse kunde {privat Lang id; privat strengnavn; // Getters og Setters}

For å synke Kunde, oppretter vi en lytterklasse som vil lagre kundenheten ved hjelp av CustomerRepository:

@EnableBinding (Sink.class) offentlig klasse CustomerListener {@Autowired private CustomerRepository repository; @StreamListener (Sink.INPUT) offentlig ugyldig lagring (kundekunde) {repository.save (kunde); }}

Og CustomerRepository, i dette tilfellet, er en MongoRepository fra vårdata:

importer org.springframework.data.mongodb.repository.MongoRepository; importer org.springframework.stereotype.Repository; @Repository offentlig grensesnitt CustomerRepository utvider MongoRepository {} 

5.4. Stream Definition

Nå, begge tilpassede applikasjonene er klare til å registreres på SCDF Server. For å oppnå dette, kompilerer du begge prosjektene ved hjelp av Maven-kommandoen mvn installere.

Vi registrerer dem deretter ved hjelp av Spring Cloud Data Flow Shell:

appregister --navn kundetransform - type prosessor --uri maven: //com.customer: customer-transform: 0.0.1-SNAPSHOT
appregister --navn kunde-mongodb-vask - type vask --uri maven: //com.customer: customer-mongodb-sink: jar: 0.0.1-SNAPSHOT

Til slutt, la oss sjekke om programmene er lagret på SCDF, kjør applikasjonslistekommandoen i skallet:

appliste

Som et resultat bør vi se begge applikasjonene i den resulterende tabellen.

5.4.1. Stream Pipeline Domain-Specific Language - DSL

En DSL definerer konfigurasjonen og dataflyten mellom applikasjonene. SCDF DSL er enkel. I det første ordet definerer vi navnet på applikasjonen, etterfulgt av konfigurasjonene.

Syntaksen er også en Unix-inspirert rørledningssyntaks, som bruker loddrette bjelker, også kjent som "rør", for å koble til flere applikasjoner:

http --port = 8181 | Logg

Dette oppretter et HTTP-program servert i port 8181 som sender eventuell mottatt kroppsnyttelast til en logg.

La oss nå se hvordan du lager DSL-strømdefinisjonen for JDBC-kilden.

5.4.2. JDBC Source Stream Definition

Nøkkelkonfigurasjonene for JDBC Source er spørsmål og Oppdater.spørsmål velger uleste poster mens Oppdater vil endre et flagg for å forhindre at gjeldende poster blir lest på nytt.

Vi definerer også JDBC-kilden til avstemning i en fast forsinkelse på 30 sekunder og maksimalt 1000 rader. Til slutt definerer vi konfigurasjonene av tilkoblingen, som driver, brukernavn, passord og tilkoblings-URL:

jdbc --query = 'VELG id, kundenavn FRA public.customer WHERE importert = false' --update = 'UPDATE public.customer SET importert = true WHERE id i (: id)' --max-rows-per-poll = 1000 - fast forsinkelse = 30 --time-unit = SECONDS - driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - passord = postgres

Flere JDBC-kildekonfigurasjonsegenskaper finner du her.

5.4.3. Kunde MongoDB Sink Stream Definition

Da vi ikke definerte tilkoblingskonfigurasjonene i application.properties av kunde-mongodb-vaskkonfigurerer vi gjennom DSL-parametere.

Vår søknad er fullt basert på MongoDataAutoConfiguration. Du kan sjekke de andre mulige konfigurasjonene her. I utgangspunktet vil vi definere vår.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. Opprett og distribuer strømmen

Først, for å lage den endelige strømdefinisjonen, går du tilbake til Shell og utfører følgende kommando (uten linjeskift er de nettopp satt inn for lesbarhet):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'VELG id, kundenavn FRA public.customer WHERE importert = false' - fast-delay = 30 --max-rader-per-avstemning = 1000 --update = 'UPDATE kundesett importert = true WHERE id in (: id)' --time-unit = SECONDS --password = postgres - driver-class-name = org.postgresql.Driver --username = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Denne strømmen DSL definerer en strøm som heter jdbc-til-mongodb. Neste, vi distribuerer strømmen ved navn:

stream deploy --name jdbc-to-mongodb 

Til slutt bør vi se plasseringene til alle tilgjengelige logger i loggutgangen:

Logger vil være i {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Logger vil være i {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform Logger vil være i {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Konklusjon

I denne artikkelen har vi sett et fullstendig eksempel på en ETL-datarørledning ved hjelp av Spring Cloud Data Flow.

Mest bemerkelsesverdig så vi konfigurasjonene til en applikasjonsstarter, opprettet en ETL-strømrørledning ved hjelp av Spring Cloud Data Flow Shell og implementerte tilpassede applikasjoner for å lese, transformere og skrive data.

Som alltid kan eksempelkoden bli funnet i GitHub-prosjektet.


$config[zx-auto] not found$config[zx-overlay] not found