Komme i gang med strømbehandling med Spring Cloud Data Flow

1. Introduksjon

Spring Cloud Data Flow er en sky-native programmerings- og driftsmodell for komponerbare datamikrotjenester.

Med Spring Cloud Data Flow, utviklere kan opprette og orkestrere datarørledninger for vanlige bruksområder som datainntak, sanntidsanalyse og dataimport / eksport.

Disse datarørledningene kommer i to smaker, streaming og batch datarørledninger.

I det første tilfellet forbrukes eller produseres en ubegrenset mengde data via meldingsprogramvare. Mens i det andre tilfellet den kortvarige oppgaven behandler et endelig datasett og deretter avsluttes.

Denne artikkelen vil fokusere på streamingbehandling.

2. Arkitektonisk oversikt

Nøkkelkomponentene denne typen arkitektur er applikasjoner, den Data Flow Server, og målet kjøretid.

I tillegg til disse nøkkelkomponentene har vi vanligvis også en Data Flow Shell og en meldingsformidler innenfor arkitekturen.

La oss se alle disse komponentene mer detaljert.

2.1. applikasjoner

En streaming-datarørledning inkluderer vanligvis forbrukende hendelser fra eksterne systemer, databehandling og polyglot-utholdenhet. Disse fasene blir ofte referert til som Kilde, Prosessor, og Synke i Vårsky terminologi:

  • Kilde: er applikasjonen som forbruker hendelser
  • Prosessor: bruker data fra Kilde, gjør noe behandling på den, og sender de behandlede dataene til neste applikasjon i rørledningen
  • Synke: enten forbruker fra en Kilde eller Prosessor og skriver dataene til ønsket utholdenhetslag

Disse applikasjonene kan pakkes på to måter:

  • Spring Boot uber-jar som er vert i et maven-arkiv, fil, http eller annen Spring-ressursimplementering (denne metoden vil bli brukt i denne artikkelen)
  • Docker

Mange kilder, prosessorer og vaskeapplikasjoner for vanlige brukstilfeller (f.eks. Jdbc, hdfs, http, router) er allerede levert og klare til bruk av Spring Cloud Data Flow team.

2.2. Kjøretid

Det er også behov for kjøretid for at disse applikasjonene skal kunne kjøres. De støttede kjøretidene er:

  • Cloud Foundry
  • Apache GARN
  • Kubernetes
  • Apache Mesos
  • Lokal server for utvikling (som vil bli brukt i denne artikkelen)

2.3. Data Flow Server

Komponenten som er ansvarlig for distribusjon av applikasjoner til en kjøretid er Data Flow Server. Det er en Data Flow Server kjørbar krukke levert for hver av målkjøringstidene.

De Data Flow Server er ansvarlig for tolking:

  • En stream DSL som beskriver den logiske dataflyten gjennom flere applikasjoner.
  • Et distribusjonsmanifest som beskriver kartleggingen av applikasjoner på kjøretiden.

2.4. Data Flow Shell

Data Flow Shell er en klient for Data Flow Server. Skallet lar oss utføre DSL-kommandoen som trengs for å samhandle med serveren.

Som et eksempel vil DSL for å beskrive datastrømmen fra en http-kilde til en jdbc-vask skrives som “http | jdbc ”. Disse navnene i DSL er registrert hos Data Flow Server og kartlegge på applikasjonsgjenstander som kan være vert i Maven- eller Docker-arkiver.

Våren tilbyr også et grafisk grensesnitt, kalt Flo, for å opprette og overvåke streaming datarørledninger. Imidlertid er bruken utenfor diskusjonen om denne artikkelen.

2.5. Meldingsformidler

Som vi har sett i eksemplet i forrige avsnitt, har vi brukt rørsymbolet i definisjonen av datastrømmen. Rørsymbolet representerer kommunikasjonen mellom de to applikasjonene via meldingsprogramvare.

Dette betyr at vi trenger en meldingsmegler som er i gang i målmiljøet.

De to mellommeldingsmeglerne som støttes, er:

  • Apache Kafka
  • RabbitMQ

Og så, nå som vi har en oversikt over de arkitektoniske komponentene - det er på tide å bygge vår første strømbehandlingsrørledning.

3. Installer en meldingsmegler

Som vi har sett trenger applikasjonene i rørledningen en meldingsprogramvare for å kommunisere. For formålet med denne artikkelen skal vi gå med RabbitMQ.

For fullstendige detaljer om installasjonen, kan du følge instruksjonene på det offisielle nettstedet.

4. Den lokale dataflyt-serveren

For å øke hastigheten på prosessen med å generere applikasjonene våre, bruker vi Spring Initializr; med hjelpen, kan vi skaffe oss våre Vårstøvel applikasjoner på få minutter.

Når du har kommet til nettstedet, velger du bare a Gruppe og en Gjenstand Navn.

Når dette er gjort, klikker du på knappen Generer prosjekt for å starte nedlastingen av Maven-gjenstanden.

Etter at nedlastingen er fullført, pakker du ut prosjektet og importerer det som et Maven-prosjekt i din IDE du ønsker.

La oss legge til en Maven-avhengighet i prosjektet. Som vi trenger Dataflow Local Server biblioteker, la oss legge til vår-sky-starter-dataflow-server-lokal avhengighet:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Nå må vi kommentere Vårstøvel hovedklasse med @EnableDataFlowServer kommentar:

@EnableDataFlowServer @ SpringBootApplication offentlig klasse SpringDataFlowServerApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

Det er alt. Våre Lokal dataflyt-server er klar til å bli henrettet:

mvn spring-boot: run

Søknaden starter opp på port 9393.

5. Dataflytskallet

Igjen, gå til Spring Initializr og velg en Gruppe og Gjenstand Navn.

Når vi har lastet ned og importert prosjektet, la oss legge til en vår-sky-dataflow-shell-avhengighet:

 org.springframework.cloud spring-cloud-dataflow-shell 

Nå må vi legge til @EnableDataFlowShell kommentar til Vårstøvel hovedklasse:

@EnableDataFlowShell @ SpringBootApplication offentlig klasse SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

Vi kan nå kjøre skallet:

mvn spring-boot: run

Etter at skallet er i gang, kan vi skrive inn hjelp kommandoen i ledeteksten for å se en komplett liste over kommandoer som vi kan utføre.

6. Kildesøknaden

På samme måte, på Initializr, oppretter vi nå en enkel applikasjon og legger til en Strøm kanin avhengighet kalt vår-sky-start-strøm-kanin:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

Vi legger deretter til @EnableBinding (Source.class) kommentar til Vårstøvel hovedklasse:

@EnableBinding (Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Nå må vi definere kilden til dataene som må behandles. Denne kilden kan være en hvilken som helst potensiell endeløs arbeidsbelastning (internet-of-things-sensordata, 24/7 hendelsesbehandling, online transaksjonsdata inntatt).

I vår prøveapplikasjon produserer vi en hendelse (for enkelhets skyld et nytt tidsstempel) hvert 10. sekund med en Poller.

De @InboundChannelAdapter kommentar sender en melding til kildens utgangskanal, og bruker returverdien som nyttelast for meldingen:

@Bean @InboundChannelAdapter (verdi = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) offentlig MessageSource timeMessageSource () {retur () -> MessageBuilder.withPayload (ny dato (). GetTime ()).bygge(); } 

Datakilden vår er klar.

7. Prosessorsøknaden

Neste - vi oppretter en applikasjon og legger til en Strøm kanin avhengighet.

Vi legger deretter til @EnableBinding (prosessor.klasse) kommentar til Vårstøvel hovedklasse:

@EnableBinding (Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Deretter må vi definere en metode for å behandle dataene som kommer fra kildeapplikasjonen.

For å definere en transformator, må vi kommentere denne metoden med @Transformer kommentar:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform (Long timestamp) {DateFormat dateFormat = new SimpleDateFormat ("åååå / MM / dd tt: mm: åå"); String date = dateFormat.format (tidsstempel); returdato; }

Den konverterer et tidsstempel fra 'inngangskanalen' til en formatert dato som sendes til 'utgangskanalen'.

8. Vaskeapplikasjonen

Den siste applikasjonen du oppretter, er Sink-applikasjonen.

Igjen, gå til Spring Initializr og velg en Gruppe, en Gjenstand Navn. Etter å ha lastet ned prosjektet, la oss legge til en Strøm kanin avhengighet.

Deretter legger du til @EnableBinding (Sink.class) kommentar til Vårstøvel hovedklasse:

@EnableBinding (Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Nå trenger vi en metode for å fange opp meldingene som kommer fra prosessorapplikasjonen.

For å gjøre dette må vi legge til @StreamListener (Sink.INPUT) kommentar til vår metode:

@StreamListener (Sink.INPUT) public void loggerSink (String date) {logger.info ("Mottatt:" + dato); }

Metoden skriver ganske enkelt ut tidsstempelet forvandlet i en formatert dato til en loggfil.

9. Registrer en Stream-app

Spring Cloud Data Flow Shell lar oss registrere en Stream-app i appregistret ved hjelp av appregister kommando.

Vi må oppgi et unikt navn, applikasjonstype og en URI som kan løses i appgjenstanden. For typen, spesifiser “kilde“, “prosessor“, Eller“synke“.

Når du gir en URI med maven-skjemaet, bør formatet være i samsvar med følgende:

maven: //: [: [:]]:

For å registrere Kilde, Prosessor og Synke applikasjoner som er opprettet tidligere, går du til Spring Cloud Data Flow Shell og gi følgende kommandoer fra ledeteksten:

app register --name time-source --type source --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT app register --name time -prosessor --typeprosessor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processor: jar: 0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. Opprett og distribuer strømmen

For å opprette en ny streamdefinisjon, gå til Spring Cloud Data Flow Shell og utfør følgende skallkommando:

stream create --name time-to-log --definition 'time-source | tidsprosessor | logging-sink '

Dette definerer en strøm som heter time-to-log basert på DSL-uttrykk ‘Tidskilde | tidsprosessor | logging-sink '.

For å distribuere strømmen, kjør følgende skallkommando:

stream deploy --name time-to-log

De Data Flow Server løser tidskilde, tidsprosessor, og hogst-vask å maven koordinater og bruker disse til å starte tidskilde, tidsprosessor og hogst-vask applikasjoner av strømmen.

Hvis strømmen er riktig distribuert, ser du i Data Flow Server logger at modulene er startet og bundet sammen:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: distribuere app time-to-log.logging-sink instance 0 Logger vil være i PATH_TO_LOG / vår-sky-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: distribuere app time-to-log.time-processor instance 0 Logger vil være i PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processor 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: distribuere app time-to-log.time-source-forekomst 0 Logger vil være i PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Gjennomgang av resultatet

I dette eksemplet sender kilden ganske enkelt gjeldende tidsstempel som en melding hvert sekund, prosessoren formaterer den og loggvasken sender ut det formaterte tidsstempelet ved hjelp av loggingsrammen.

Loggfilene ligger i katalogen som vises i Data Flow ServerLoggutdata, som vist ovenfor. For å se resultatet kan vi hale loggen:

hale -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Mottatt: 2016/08/24 11:40:01 2016-08-24 12: 40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Mottatt: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Mottatt: 2016/08 / 24 11:40:21

12. Konklusjon

I denne artikkelen har vi sett hvordan man bygger en datarørledning for strømbehandling ved bruk av Spring Cloud Data Flow.

Vi så også rollen som Kilde, Prosessor og Synke applikasjoner inne i strømmen og hvordan du kobler og knytter denne modulen i en Data Flow Server gjennom bruk av Data Flow Shell.

Eksempelkoden finnes i GitHub-prosjektet.


$config[zx-auto] not found$config[zx-overlay] not found