Batchbehandling av Java EE 7

1. Introduksjon

Tenk deg at vi måtte fullføre oppgaver manuelt som å behandle lønnsslipp, beregne renter og generere regninger. Det ville bli ganske kjedelig, feilutsatt og en uendelig liste over manuelle oppgaver!

I denne opplæringen tar vi en titt på Java Batch Processing (JSR 352), en del av Jakarta EE-plattformen, og en god spesifikasjon for automatisering av oppgaver som disse. Det gir applikasjonsutviklere en modell for å utvikle robuste batchbehandlingssystemer slik at de kan fokusere på forretningslogikken.

2. Maven-avhengigheter

Siden JSR 352 bare er en spesifikasjon, må vi inkludere API og implementering, som jberet:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

Vi vil også legge til en database i minnet, slik at vi kan se på noen mer realistiske scenarier.

3. Nøkkelbegreper

JSR 352 introduserer noen få konsepter, som vi kan se på denne måten:

La oss først definere hvert stykke:

  • Fra venstre har vi JobOperator. Den administrerer alle aspekter av jobbbehandling som å starte, stoppe og starte på nytt
  • Deretter har vi Jobb. En jobb er en logisk samling av trinn; den innkapsler en hel batchprosess
  • En jobb vil inneholde mellom 1 og n Stegs. Hvert trinn er en uavhengig, sekvensiell arbeidsenhet. Et trinn er sammensatt av lesning inngang, behandling den innspillingen, og skriving produksjon
  • Og sist, men ikke minst, har vi JobRepository som lagrer løpende informasjon om jobbene. Det hjelper med å holde oversikt over jobber, deres tilstand og fullføringsresultater

Trinnene har litt mer detaljer enn dette, så la oss ta en titt på det neste. Først skal vi se på Klump trinn og deretter kl Batchlets.

4. Lage en del

Som nevnt tidligere er en bit et slags skritt. Vi bruker ofte en del for å uttrykke en operasjon som utføres om og om igjen, si over et sett med gjenstander. Det er omtrent som mellomliggende operasjoner fra Java Streams.

Når vi beskriver en del, må vi uttrykke hvor vi skal ta varene fra, hvordan de skal behandles og hvor de skal sendes etterpå.

4.1. Leseartikler

For å lese elementene må vi implementere dem ItemReader.

I dette tilfellet lager vi en leser som ganske enkelt sender ut tallene 1 til 10:

@Named offentlig klasse SimpleChunkItemReader utvider AbstractItemReader {private Integer [] tokens; privat antall teller; @Injiser JobContext jobContext; @ Override public Integer readItem () kaster unntak {if (count> = tokens.length) {return null; } jobContext.setTransientUserData (count); retur tokens [count ++]; } @ Override public void open (Serializable checkpoint) throw Exception {tokens = new Integer [] {1,2,3,4,5,6,7,8,9,10}; telle = 0; }}

Nå leser vi bare fra klassens interne tilstand her. Men selvfølgelig, readItem kunne hente fra en database, fra filsystemet eller en annen ekstern kilde.

Merk at vi lagrer noe av denne interne tilstanden ved hjelp av JobContext # setTransientUserData () som vil komme godt med senere.

Legg også merke til kontrollpunkt parameter. Vi henter det også igjen.

4.2. Behandler gjenstander

Selvfølgelig er grunnen til at vi klumper oss, at vi ønsker å utføre en slags operasjon på varene våre!

Hver gang vi kommer tilbake null fra en vareprosessor, slipper vi den varen fra batchen.

Så la oss her si at vi bare vil beholde partallene. Vi kan bruke en ItemProsessor som avviser de rare ved å returnere null:

@Named offentlig klasse SimpleChunkItemProcessor implementerer ItemProcessor {@Override public Integer processItem (Object t) {Integer item = (Integer) t; returvare% 2 == 0? element: null; }}

prosessProdukt vil bli ringt en gang for hvert element som vårt ItemReader avgir.

4.3. Skriveelementer

Til slutt vil jobben påkalle ItemWriter slik at vi kan skrive de forvandlede artiklene våre:

@Named offentlig klasse SimpleChunkWriter utvider AbstractItemWriter {List prosessert = ny ArrayList (); @Override public void writeItems (List items) kaster Unntak {items.stream (). Map (Integer.class :: cast) .forEach (prosessert :: add); }} 

Hvor lang er det gjenstander? I et øyeblikk vil vi definere størrelsen på en del, som vil bestemme størrelsen på listen som sendes til writeItems.

4.4. Definere en del i en jobb

Nå setter vi alt dette sammen i en XML-fil ved hjelp av JSL eller Job Specification Language. Merk at vi vil liste opp leseren, prosessoren, chunkeren, og også en klumpstørrelse:

Størrelsen er hvor ofte fremdriften i bunnen er forpliktet til jobbmagasinet, som er viktig for å garantere ferdigstillelse, hvis en del av systemet mislykkes.

Vi må plassere denne filen i META-INF / batch-jobber for.krukke filer og i WEB-INF / klasser / META-INF / batch-jobber til .krig filer.

Vi ga jobben vår id “SimpleChunk”, så la oss prøve det i en enhetstest.

Nå utføres jobber asynkront, noe som gjør dem vanskelige å teste. I eksemplet må du sjekke ut vår BatchTestHelper som måler og venter til jobben er fullført:

@Test offentlig ugyldighet givenChunk_thenBatch_completesWithSuccess () kaster unntak {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang kjøringId = jobOperator.start ("simpleChunk", nye egenskaper ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

Så det er det biter er. La oss ta en titt på batchlets.

5. Lage en Batchlet

Ikke alt passer pent inn i en iterativ modell. For eksempel kan vi ha en oppgave som vi bare trenger påkalle en gang, løp til ferdigstillelse, og returner en utgangsstatus.

Kontrakten for en batchlet er ganske enkel:

@Named offentlig klasse SimpleBatchLet utvider AbstractBatchlet {@Override public String prosess () kaster Unntak {return BatchStatus.COMPLETED.toString (); }}

Som det er JSL:

Og vi kan teste det med samme tilnærming som før:

@Test offentlig ugyldighet givenBatchlet_thenBatch_completeWithSuccess () kaster unntak {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang kjøringId = jobOperator.start ("simpleBatchLet", nye egenskaper ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Så vi har sett på et par forskjellige måter å implementere trinn på.

La oss nå se på mekanismer for markering og garanti for fremgang.

6. Egendefinert kontrollpunkt

Svikt vil sikkert skje midt i en jobb. Skal vi bare starte på nytt, eller kan vi på en eller annen måte starte der vi slapp?

Som navnet antyder, sjekkpunkter hjelpe oss med jevnlig å sette et bokmerke i tilfelle feil.

Som standard er slutten på prosessering av klumper et naturlig kontrollpunkt.

Vi kan imidlertid tilpasse det med vårt eget Sjekkpunktalgoritme:

@Named offentlig klasse CustomCheckPoint utvider AbstractCheckpointAlgorithm {@Inject JobContext jobContext; @Override offentlig boolsk isReadyToCheckpoint () kaster unntak {int counterRead = (Integer) jobContext.getTransientUserData (); retur tellerLes% 5 == 0; }}

Husker du tellingen vi plasserte i forbigående data tidligere? Her, vi kan trekke det ut med JobContext # getTransientUserDataå si at vi ønsker å forplikte oss til hvert 5. nummer som behandles.

Uten dette ville en forpliktelse skje på slutten av hver del, eller i vårt tilfelle hvert tredje tall.

Og så matcher vi det med kasse-algoritme direktivet i XML under klumpen vår:

La oss teste koden, igjen og merke oss at noen av kjeleplattetrinnene er gjemt bort i BatchTestHelper:

@Test offentlig ugyldighet givenChunk_whenCustomCheckPoint_thenCommitCountIsThree () kaster unntak {// ... start jobb og vent på fullføring jobOperator.getStepExecutions (executId) .stream () .map (BatchTestHelper :: getCommitCount) .forEach (count -> assertEquals .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Så vi kan forvente en forpliktelse på 2 siden vi har ti elementer og konfigurert forpliktelsene til å være hvert 5. element. Men, rammeverket forplikter enda en siste lesning til slutt for å sikre at alt er behandlet, og det er det som bringer oss opp til 3.

Deretter, la oss se på hvordan du håndterer feil.

7. Unntakshåndtering

Som standard, jobboperatøren vil merke jobben vår som MISLAGT i tilfelle et unntak.

La oss bytte vareleser for å sikre at den mislykkes:

@ Override public Integer readItem () kaster unntak {if (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken (); kaste nye RuntimeException (); } returner null; }

Og så test:

@Test offentlig ugyldig nårChunkError_thenBatch_CompletesWithFailed () kaster Unntak {// ... start jobb og vent på fullføring assertEquals (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

Men vi kan overstyre denne standardoppførselen på flere måter:

  • skip-limit angir antall unntak dette trinnet vil ignorere før det mislykkes
  • prøv på nytt angir antall ganger jobboperatøren skal prøve trinnet før det mislykkes
  • hoppbar-unntak-klasse angir et sett med unntak som prosessering av deler vil ignorere

Så vi kan redigere jobben vår slik at den ignorerer RuntimeException, i tillegg til noen få andre, bare for illustrasjon:

Og nå vil koden vår passere:

@Test offentlig ugyldighet givenChunkError_thenErrorSkipped_CompletesWithSuccess () kaster unntak {// ... start jobb og vent på fullføring jobOperator.getStepExecutions (executId) .stream () .map (BatchTestHelper :: getProcessSkipCount) .forEachert (skipCount - skipCount - .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. Utføre flere trinn

Vi nevnte tidligere at en jobb kan ha et hvilket som helst antall trinn, så la oss se det nå.

8.1. Avfyring av neste trinn

Som standard, hvert trinn er det siste trinnet i jobben.

For å kunne utføre neste trinn i en batchjobb, må vi spesifikt spesifisere ved å bruke neste attributt innenfor trinndefinisjonen:

Hvis vi glemmer dette attributtet, blir ikke neste trinn i rekkefølge utført.

Og vi kan se hvordan dette ser ut i API:

@Test offentlig ugyldighet givenTwoSteps_thenBatch_CompleteWithSuccess () kaster Unntak {// ... start jobb og vent på fullføring assertEquals (2, jobOperator.getStepExecutions (executId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. Strømmer

En trinnsekvens kan også kapsles inn i en strømme. Når flyten er ferdig, er det hele flyten som overgår til utførelseselementet. Også elementer i flyten kan ikke overgå til elementer utenfor strømmen.

Vi kan si, utføre to trinn inne i en strøm, og deretter ha den flytovergangen til et isolert trinn:

Og vi kan fortsatt se hver trinnkjøring uavhengig:

@Test offentlig ugyldighet givenFlow_thenBatch_CompleteWithSuccess () kaster Unntak {// ... start jobb og vent på fullføring assertEquals (3, jobOperator.getStepExecutions (executId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. Avgjørelser

Vi har også hvis / annet støtte i form av avgjørelser. Avgjørelser gir en tilpasset måte å bestemme en sekvens på mellom trinn, strømmer og splittelser.

Som trinn fungerer det på overgangselementer som f.eks neste som kan lede eller avslutte jobben.

La oss se hvordan jobben kan konfigureres:

Noen beslutning elementet må konfigureres med en klasse som implementeres Avgjør. Dens jobb er å returnere en beslutning som en String.

Hver neste innsiden beslutning er som en sak i en bytte om uttalelse.

8.4. Splitter

Splitter er nyttige siden de tillater oss å utføre strømmer samtidig:

Selvfølgelig, dette betyr at bestillingen ikke er garantert.

La oss bekrefte at de fortsatt kjører. Strømningstrinnene vil bli utført i en vilkårlig rekkefølge, men det isolerte trinnet vil alltid være sist:

@Test offentlig ugyldig givenSplit_thenBatch_CompletesWithSuccess () kaster Unntak {// ... start jobb og vent på ferdigstillelse List stepExecutions = jobOperator.getStepExecutions (executId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. Partisjonere en jobb

Vi kan også konsumere batchegenskapene i Java-koden vår som er definert i jobben vår.

De kan omfatte på tre nivåer - jobben, trinnet og batch-artefakten.

La oss se noen eksempler på hvordan de konsumerte.

Når vi vil konsumere eiendommene på jobbnivå:

@Injiser JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

Dette kan også konsumeres på trinnnivå:

@Injiser StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

Når vi vil konsumere egenskapene på batch-artefaktnivå:

@Inject @BatchProperty (name = "name") privat streng nameString;

Dette kommer godt med partisjoner.

Se, med splitt kan vi kjøre flyter samtidig. Men vi kan også skillevegg et skritt inn n sett med elementer eller angi separate innganger, slik at vi kan dele opp arbeidet på flere tråder.

For å forstå hvilket arbeidssegment hver partisjon skal gjøre, kan vi kombinere egenskaper med partisjoner:

10. Stopp og start på nytt

Nå, det er det for å definere jobber. La oss nå snakke et øyeblikk om å administrere dem.

Vi har allerede sett i enhetstestene at vi kan få en forekomst av JobOperator fra BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

Og så kan vi starte jobben:

Lang kjøringId = jobOperator.start ("simpleBatchlet", nye egenskaper ());

Vi kan imidlertid også stoppe jobben:

jobOperator.stop (executionId);

Og til slutt kan vi starte jobben på nytt:

executId = jobOperator.restart (executId, nye egenskaper ());

La oss se hvordan vi kan stoppe en løpende jobb:

@Test offentlig ugyldighet givenBatchLetStarted_whenStopped_thenBatchStopped () kaster Unntak {JobOperator jobOperator = BatchRuntime.getJobOperator (); Lang kjøringId = jobOperator.start ("simpleBatchLet", nye egenskaper ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobOperator.stop (executId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

Og hvis en gruppe er STOPPET, så kan vi starte den på nytt:

@Test offentlig ugyldig givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... start og stopp jobbet assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); executId = jobOperator.restart (jobExecution.getExecutionId (), nye egenskaper ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (executionId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. Henter jobber

Når en batchjobb sendes inn da batch runtime oppretter en forekomst av JobExecution for å spore det.

For å få tak i JobExecution for en utførelses-ID, kan vi bruke JobOperator # getJobExecution (executionId) metode.

Og, StepExecution gir nyttig informasjon for sporing av trinnets utførelse.

For å få tak i StepExecution for en utførelses-ID, kan vi bruke JobOperator # getStepExecutions (executId) metode.

Og fra det kan vi få flere beregninger om trinnet via StepExecution # getMetrics:

@Test offentlig ugyldighet givenChunk_whenJobStarts_thenStepsHaveMetrics () kaster Unntak {// ... start jobb og vent på ferdigstillelse assertTrue (jobOperator.getJobNames (). Inneholder ("simpleChunk")); assertTrue (jobOperator.getParameters (executionId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (executId) .get (0); Kart metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... og mange flere! }

12. Ulemper

JSR 352 er kraftig, selv om den mangler på en rekke områder:

  • Det ser ut til å være mangel på lesere og forfattere som kan behandle andre formater som JSON
  • Det er ingen støtte fra generiske legemidler
  • Partisjonering støtter bare ett trinn
  • API-et tilbyr ikke noe som støtter planlegging (selv om J2EE har en egen planleggingsmodul)
  • På grunn av sin asynkrone natur kan testing være en utfordring
  • API er ganske ordentlig

13. Konklusjon

I denne artikkelen så vi på JSR 352 og lærte om biter, batchlets, splitt, strømmer og mye mer. Likevel har vi knapt skrapet overflaten.

Demokoden kan som alltid finnes på GitHub.