Integrering av våren med AWS Kinesis

1. Introduksjon

Kinesis er et verktøy for å samle, behandle og analysere datastrømmer i sanntid, utviklet hos Amazon. En av hovedfordelene er at den hjelper til med utvikling av hendelsesdrevne applikasjoner.

I denne veiledningen vil vi utforske noen få biblioteker som gjør det mulig for vårapplikasjonen å produsere og konsumere poster fra en Kinesis Stream. Kodeeksemplene viser den grunnleggende funksjonaliteten, men representerer ikke den produksjonsklare koden.

2. Forutsetning

Før vi går videre, må vi gjøre to ting.

Den første er å lage et vårprosjekt, da målet her er å samhandle med Kinesis fra et vårprosjekt.

Den andre er å lage en Kinesis Data Stream. Vi kan gjøre dette fra en nettleser i AWS-kontoen vår. Et alternativ for AWS CLI-fans blant oss er å bruke kommandolinjen. Fordi vi vil samhandle med det fra kode, må vi også ha AWS IAM-legitimasjon, tilgangsnøkkel og hemmelig nøkkel og regionen.

Alle produsentene våre vil lage dummy IP-adresseoppføringer, mens forbrukerne vil lese disse verdiene og føre dem opp i applikasjonskonsollen.

3. AWS SDK for Java

Det aller første biblioteket vi bruker er AWS SDK for Java. Fordelen er at den lar oss administrere mange deler av arbeidet med Kinesis Data Streams. Vi kan lese data, produsere data, opprette datastrømmer og hardføre datastrømmer. Ulempen er at for å ha produksjonsklar kode, må vi kode aspekter som omharding, feilhåndtering eller en demon for å holde forbrukeren i live.

3.1. Maven avhengighet

Amazon-kinesis-klientens Maven-avhengighet vil gi alt vi trenger for å ha fungerende eksempler. Vi legger det nå til pom.xml fil:

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Våroppsett

La oss gjenbruke AmazonKinesis objektet som trengs for å samhandle med Kinesis Stream. Vi lager det som en @Bønne inne i vår @SpringBootApplication klasse:

@Bean offentlig AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = ny BasicAWSCredentials (accessKey, secretKey); returner AmazonKinesisClientBuilder.standard () .withCredentials (ny AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

Deretter la oss definere aws.access.key og aws.secret.key, nødvendig for den lokale maskinen, i application.properties:

aws.access.key = min-aws-tilgang-nøkkel-går-her aws.secret.key = min-aws-hemmelige-nøkkel-går-hit

Og vi vil lese dem ved hjelp av @Verdi kommentar:

@Value ("$ {aws.access.key}") privat streng tilgangsknapp; @Value ("$ {aws.secret.key}") privat streng secretKey;

For enkelhets skyld kommer vi til å stole på @ Planlagt metoder for å lage og konsumere poster.

3.3. Forbruker

De AWS SDK Kinesis Consumer bruker en pull-modell, noe som betyr at koden vår vil trekke poster fra skjærene i Kinesis datastrøm:

GetRecordsRequest recordsRequest = ny GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> new String (record.getData (). array ())). forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

De GetRecordsRequest objekt bygger forespørselen om strømdata. I vårt eksempel har vi definert en grense på 25 poster per forespørsel, og vi fortsetter å lese til det ikke er noe mer å lese.

Vi kan også legge merke til at vi har brukt a for vår iterasjon GetShardIteratorResult gjenstand. Vi opprettet dette objektet inne i @PostConstrucmetode slik at vi begynner å spore poster med en gang:

privat GetShardIteratorResult shardIterator; @PostConstruct privat tomrom buildShardIterator () {GetShardIteratorRequest readShardsRequest = ny GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. Produsent

La oss nå se hvordan håndtere opprettelsen av poster for Kinesis-datastrømmen.

Vi setter inn data ved hjelp av a PutRecordsRequest gjenstand. For dette nye objektet legger vi til en liste som inneholder flere PutRecordsRequestEntry gjenstander:

Listeoppføringer = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = new PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); returoppføring;}). collect (Collectors.toList ()); PutRecordsRequest createRecordsRequest = ny PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (oppføringer); kinesis.putRecords (createRecordsRequest);

Vi har opprettet en grunnleggende forbruker og en produsent av simulerte IP-poster. Alt som gjenstår å gjøre nå er å kjøre vårprosjektet og se IP-er oppført i applikasjonskonsollen vår.

4. KCL og KPL

Kinesis Client Library (KCL) er et bibliotek som forenkler forbruket av poster. Det er også et lag med abstraksjon over AWS SDK Java APIer for Kinesis Data Streams. Bak kulissene håndterer biblioteket belastningsbalansering i mange tilfeller, reagerer på feil i forekomsten, kontrollerer behandlede poster og reagerer på omharding.

Kinesis Producer Library (KPL) er et bibliotek som er nyttig for skriving til en Kinesis-datastrøm. Det gir også et lag med abstraksjon som sitter over AWS SDK Java APIer for Kinesis Data Streams. For bedre ytelse håndterer biblioteket automatisk batching, multi-threading og prøv på nytt logikk.

KCL og KPL har begge hovedfordelen at de er enkle å bruke, slik at vi kan fokusere på å produsere og konsumere poster.

4.1. Maven avhengigheter

De to bibliotekene kan tas med hver for seg i prosjektet vårt om nødvendig. For å inkludere KPL og KCL i vårt Maven-prosjekt, må vi oppdatere pom.xml-filen:

 com.amazonaws amazon-kinesis-producer 0.13.1 com.amazonaws amazon-kinesis-client 1.11.2 

4.2. Våroppsett

Den eneste vårforberedelsen vi trenger er å sørge for at vi har IAM-legitimasjonen for hånden. Verdiene for aws.access.key og aws.secret.key er definert i vår application.properties slik at vi kan lese dem med @Verdi når det trengs.

4.3. Forbruker

Først skal vi lage en klasse som implementerer IR-prosessor grensesnitt og definerer logikken vår for hvordan vi skal håndtere Kinesis datastrømsposter, som er å skrive dem ut i konsollen:

offentlig klasse IpProcessor implementerer IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (record -> System.out.println (record). () .array ()))); } @Override offentlig ugyldig nedleggelse (ShutdownInput shutdownInput) {}}

Det neste trinnet er å definere en fabrikklasse som implementerer IRecordProcessorFactory grensesnitt og returnerer en tidligere opprettet IpProcessor gjenstand:

offentlig klasse IpProcessorFactory implementerer IRecordProcessorFactory {@Override offentlig IRecordProcessor createProcessor () {returner ny IpProcessor (); }}

Og nå for siste trinn, vi bruker en Arbeider mot å definere forbrukerrørledningen. Vi trenger en KinesisClientLibConfiguration objekt som vil definere, om nødvendig, IAM Credentials og AWS Region.

Vi vil passere KinesisClientLibConfiguration, og vår IpProcessorFactory objekt, mot vårt Arbeider og start den deretter i en egen tråd. Vi holder denne logikken med å konsumere poster alltid i live ved bruk av Arbeider klasse, så vi leser kontinuerlig nye poster nå:

BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = ny KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, ny AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); endelig arbeiderarbeider = ny Worker.Builder () .recordProcessorFactory (ny IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. Produsent

La oss nå definere KinesisProducerConfiguration objekt, og legger til IAM-legitimasjonen og AWS-regionen:

BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = ny KinesisProducerConfiguration () .setCredentialsProvider (ny AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName ()); this.kinesisProducer = ny KinesisProducer (producerConfig);

Vi inkluderer kinesisProducer objekt tidligere opprettet i en @ Planlagt jobbe og produsere poster for Kinesis-datastrømmen kontinuerlig:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())). For hver (oppføring -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, );

5. Spring Cloud Stream Binder Kinesis

Vi har allerede sett to biblioteker, begge opprettet utenfor vårens økosystem. Vi gjør det nå se hvordan Spring Cloud Stream Binder Kinesis kan forenkle livet vårt ytterligere mens du bygger på toppen av Spring Cloud Stream.

5.1. Maven avhengighet

Maven-avhengigheten vi trenger å definere i vår applikasjon for Spring Cloud Stream Binder Kinesis er:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Våroppsett

Når du kjører på EC2, oppdages de nødvendige AWS-egenskapene automatisk, så det er ikke nødvendig å definere dem. Siden vi kjører eksemplene våre på en lokal maskin, må vi definere vår IAM-tilgangsnøkkel, hemmelige nøkkel og region for AWS-kontoen vår. Vi har også deaktivert automatisk registrering av CloudFormation-stakknavn for applikasjonen:

cloud.aws.credentials.access-key = my-aws-access-key cloud.aws.credentials.secret-key = my-aws-secret-key cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = falsk

Spring Cloud Stream er pakket med tre grensesnitt som vi kan bruke i vår stream-binding:

  • De Synke er for datainntak
  • De Kilde brukes til publisering av poster
  • De Prosessor er en kombinasjon av begge deler

Vi kan også definere våre egne grensesnitt hvis vi trenger det.

5.3. Forbruker

Å definere en forbruker er en todelt jobb. Først vil vi definere, i application.properties, datastrømmen som vi bruker:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-group spring.cloud.stream.bindings.input.content-type = text / plain

Og la oss definere en vår @Komponent klasse. Kommentaren @EnableBinding (Sink.class) vil tillate oss å lese fra Kinesis-strømmen ved hjelp av metoden kommentert med @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) offentlig klasse IpConsumer {@StreamListener (Sink.INPUT) offentlig tomrom forbruker (String ip) {System.out.println (ip); }}

5.4. Produsent

Produsenten kan også deles i to. Først må vi definere strømegenskapene våre inne application.properties:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

Og så vi legger til @EnableBinding (Source.class) på en vår @Komponent og lage nye testmeldinger hvert par sekunder:

@Component @EnableBinding (Source.class) offentlig klasse IpProducer {@Autowired private Source source; @Scheduled (fixedDelay = 3000L) private void produce () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (entry -> source.output (). Send ( MessageBuilder.withPayload (entry) .build ())); }}

Det er alt vi trenger for at Spring Cloud Stream Binder Kinesis skal fungere. Vi kan bare starte søknaden nå.

6. Konklusjon

I denne artikkelen har vi sett hvordan vi kan integrere vårprosjektet med to AWS-biblioteker for samhandling med en Kinesis Data Stream. Vi har også sett hvordan du bruker Spring Cloud Stream Binder Kinesis-biblioteket for å gjøre implementeringen enda enklere.

Kildekoden for denne artikkelen finner du på Github.


$config[zx-auto] not found$config[zx-overlay] not found