Spring Batch ved hjelp av Partitioner

1. Oversikt

I vår forrige introduksjon til Spring Batch introduserte vi rammeverket som et batch-prosesseringsverktøy. Vi har også utforsket konfigurasjonsdetaljene og implementeringen for en enkeltgjenget, enkeltprosessjobbutføring.

For å implementere en jobb med noen parallell behandling, er det gitt en rekke alternativer. På et høyere nivå er det to moduser for parallell behandling:

  1. Enkeltprosess, flertrådet
  2. Multiprosess

I denne raske artikkelen vil vi diskutere partisjonering av Steg, som kan implementeres for både enkeltprosess- og flerprosessjobber.

2. Partisjonere et trinn

Spring Batch med partisjonering gir oss muligheten til å dele utførelsen av en Steg:

Partisjonering Oversikt

Ovenstående bilde viser en implementering av en Jobb med en partisjonert Steg.

Det er en Steg kalt “Master”, hvis henrettelse er delt inn i noen “Slave” trinn. Disse slaver kan ta plassen til en mester, og utfallet vil fortsatt være uendret. Både mester og slave er forekomster av Steg. Slaver kan være eksterne tjenester eller bare utføre tråder lokalt.

Om nødvendig kan vi overføre data fra mesteren til slaven. Metadataene (dvs. JobRepository), sørger for at hver slave bare blir henrettet en gang i en enkelt henrettelse av Jobb.

Her er sekvensdiagrammet som viser hvordan alt fungerer:

Partisjoneringstrinn

Som vist, Partisjon Trinn kjører henrettelsen. De PartitionHandler er ansvarlig for å splitte arbeidet til "Master" i "Slaver". Den høyre Steg er slaven.

3. Maven POM

Maven-avhengighetene er de samme som nevnt i vår forrige artikkel. Det vil si Spring Core, Spring Batch og avhengigheten av databasen (i vårt tilfelle, SQLite).

4. Konfigurasjon

I vår innledende artikkel så vi et eksempel på å konvertere noen økonomiske data fra CSV til XML-fil. La oss utvide det samme eksemplet.

Her konverterer vi den økonomiske informasjonen fra fem CSV-filer til tilsvarende XML-filer, ved hjelp av en implementering med flere tråder.

Vi kan oppnå dette med en enkelt Jobb og Steg oppdeling. Vi har fem tråder, en for hver av CSV-filene.

La oss først og fremst lage en jobb:

@Bean (name = "partitionerJob") offentlig Job partitionerJob () kaster UnexpectedInputException, MalformedURLException, ParseException {return jobs.get ("partitioningJob") .start (partitionStep ()) .build (); }

Som vi kan se, dette Jobb starter med Partisjonering Trinn. Dette er vårt hovedtrinn som vil bli delt inn i forskjellige slave-trinn:

@Bean public Step partitionStep () kaster UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("partitionStep") .partitioner ("slaveStep", partitioner ()) .step (slaveStep ()) .taskExecutor (taskExecutor (). (); }

Her oppretter vi Partisjonering Trinn ved hjelp av StepBuilderFactory. For det må vi gi informasjon om SlaveTrinn og Partisjonering.

De Partisjonering er et grensesnitt som gir muligheten til å definere et sett med inngangsverdier for hver av slaverne. Med andre ord, logikk for å dele oppgaver i respektive tråder går her.

La oss lage en implementering av den, kalt CustomMultiResourcePartitioner, der vi legger inn- og utdatafilnavnene i ExecutionContext å videreføre til hvert slave trinn:

offentlig klasse CustomMultiResourcePartitioner implementerer Partitioner {@Override public Map partition (int gridSize) {Map map = new HashMap (gridSize); int i = 0, k = 1; for (Ressursressurs: ressurser) {ExecutionContext context = new ExecutionContext (); Assert.state (resource.exists (), "Ressurs eksisterer ikke:" + ressurs); context.putString (keyname, resource.getFilename ()); context.putString ("opFileName", "output" + k +++ ". xml"); map.put (PARTITION_KEY + i, kontekst); i ++; } retur kart; }}

Vi lager også bønnen for denne klassen, hvor vi gir kildekatalogen for inndatafiler:

@Bean offentlig CustomMultiResourcePartitioner partitioner () {CustomMultiResourcePartitioner partitioner = ny CustomMultiResourcePartitioner (); Ressurs [] ressurser; prøv {resources = resoursePatternResolver .getResources ("file: src / main / resources / input / *. csv"); } catch (IOException e) {throw new RuntimeException ("I / O problems when resolving" + "the input file pattern.", e); } partitioner.setResources (ressurser); retur partisjonering; }

Vi vil definere slave-trinnet, akkurat som ethvert annet trinn med leseren og forfatteren. Leseren og forfatteren vil være den samme som vi så i vårt innledende eksempel, bortsett fra at de vil motta filnavnparameter fra StepExecutionContext.

Vær oppmerksom på at disse bønnene må skrittvises, slik at de kan motta stepExecutionContext params, ved hvert trinn. Hvis de ikke ville være trinnvis, vil bønnene deres opprettes i utgangspunktet og aksepterer ikke filnavnene på trinnnivå:

@StepScope @Bean public FlatFileItemReader itemReader (@Value ("# {stepExecutionContext [fileName]}") Strengfilnavn) kaster UnexpectedInputException, ParseException {FlatFileItemReader reader = new FlatFileItemReader (); DelimitedLineTokenizer tokenizer = ny DelimitedLineTokenizer (); String [] tokens = {"brukernavn", "brukerid", "transaksjonsdato", "beløp"}; tokenizer.setNames (tokens); reader.setResource (ny ClassPathResource ("input /" + filnavn)); DefaultLineMapper lineMapper = ny DefaultLineMapper (); lineMapper.setLineTokenizer (tokenizer); lineMapper.setFieldSetMapper (ny RecordFieldSetMapper ()); reader.setLinesToSkip (1); reader.setLineMapper (lineMapper); returleser; } 
@Bean @StepScope public ItemWriter itemWriter (Marshaller marshaller, @Value ("# {stepExecutionContext [opFileName]}") Strengfilnavn) kaster MalformedURLException {StaxEventItemWriter itemWriter = ny StaxEventItemWriter (); itemWriter.setMarshaller (marshaller); itemWriter.setRootTagName ("transactionRecord"); itemWriter.setResource (ny ClassPathResource ("xml /" + filnavn)); returvareWriter; }

Mens vi nevner leseren og forfatteren i slave-trinnet, kan vi sende argumentene som null, fordi disse filnavnene ikke vil bli brukt, da de vil motta filnavnene fra stepExecutionContext:

@Bean public Step slaveStep () kaster UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("slaveStep"). Chunk (1) .reader (itemReader (null)) .writer (itemWriter (marshaller (), null)) .build (); }

5. Konklusjon

I denne opplæringen diskuterte vi hvordan du implementerer en jobb med parallell behandling ved hjelp av Spring Batch.

Som alltid er den komplette implementeringen av dette eksemplet tilgjengelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found