Integrering av våren med AWS Kinesis
1. Introduksjon
Kinesis er et verktøy for å samle, behandle og analysere datastrømmer i sanntid, utviklet hos Amazon. En av hovedfordelene er at den hjelper til med utvikling av hendelsesdrevne applikasjoner.
I denne veiledningen vil vi utforske noen få biblioteker som gjør det mulig for vårapplikasjonen å produsere og konsumere poster fra en Kinesis Stream. Kodeeksemplene viser den grunnleggende funksjonaliteten, men representerer ikke den produksjonsklare koden.
2. Forutsetning
Før vi går videre, må vi gjøre to ting.
Den første er å lage et vårprosjekt, da målet her er å samhandle med Kinesis fra et vårprosjekt.
Den andre er å lage en Kinesis Data Stream. Vi kan gjøre dette fra en nettleser i AWS-kontoen vår. Et alternativ for AWS CLI-fans blant oss er å bruke kommandolinjen. Fordi vi vil samhandle med det fra kode, må vi også ha AWS IAM-legitimasjon, tilgangsnøkkel og hemmelig nøkkel og regionen.
Alle produsentene våre vil lage dummy IP-adresseoppføringer, mens forbrukerne vil lese disse verdiene og føre dem opp i applikasjonskonsollen.
3. AWS SDK for Java
Det aller første biblioteket vi bruker er AWS SDK for Java. Fordelen er at den lar oss administrere mange deler av arbeidet med Kinesis Data Streams. Vi kan lese data, produsere data, opprette datastrømmer og hardføre datastrømmer. Ulempen er at for å ha produksjonsklar kode, må vi kode aspekter som omharding, feilhåndtering eller en demon for å holde forbrukeren i live.
3.1. Maven avhengighet
Amazon-kinesis-klientens Maven-avhengighet vil gi alt vi trenger for å ha fungerende eksempler. Vi legger det nå til pom.xml fil:
com.amazonaws amazon-kinesis-client 1.11.2
3.2. Våroppsett
La oss gjenbruke AmazonKinesis objektet som trengs for å samhandle med Kinesis Stream. Vi lager det som en @Bønne inne i vår @SpringBootApplication klasse:
@Bean offentlig AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = ny BasicAWSCredentials (accessKey, secretKey); returner AmazonKinesisClientBuilder.standard () .withCredentials (ny AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }
Deretter la oss definere aws.access.key og aws.secret.key, nødvendig for den lokale maskinen, i application.properties:
aws.access.key = min-aws-tilgang-nøkkel-går-her aws.secret.key = min-aws-hemmelige-nøkkel-går-hit
Og vi vil lese dem ved hjelp av @Verdi kommentar:
@Value ("$ {aws.access.key}") privat streng tilgangsknapp; @Value ("$ {aws.secret.key}") privat streng secretKey;
For enkelhets skyld kommer vi til å stole på @ Planlagt metoder for å lage og konsumere poster.
3.3. Forbruker
De AWS SDK Kinesis Consumer bruker en pull-modell, noe som betyr at koden vår vil trekke poster fra skjærene i Kinesis datastrøm:
GetRecordsRequest recordsRequest = ny GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> new String (record.getData (). array ())). forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }
De GetRecordsRequest objekt bygger forespørselen om strømdata. I vårt eksempel har vi definert en grense på 25 poster per forespørsel, og vi fortsetter å lese til det ikke er noe mer å lese.
Vi kan også legge merke til at vi har brukt a for vår iterasjon GetShardIteratorResult gjenstand. Vi opprettet dette objektet inne i @PostConstrucmetode slik at vi begynner å spore poster med en gang:
privat GetShardIteratorResult shardIterator; @PostConstruct privat tomrom buildShardIterator () {GetShardIteratorRequest readShardsRequest = ny GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }
3.4. Produsent
La oss nå se hvordan håndtere opprettelsen av poster for Kinesis-datastrømmen.
Vi setter inn data ved hjelp av a PutRecordsRequest gjenstand. For dette nye objektet legger vi til en liste som inneholder flere PutRecordsRequestEntry gjenstander:
Listeoppføringer = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = new PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); returoppføring;}). collect (Collectors.toList ()); PutRecordsRequest createRecordsRequest = ny PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (oppføringer); kinesis.putRecords (createRecordsRequest);
Vi har opprettet en grunnleggende forbruker og en produsent av simulerte IP-poster. Alt som gjenstår å gjøre nå er å kjøre vårprosjektet og se IP-er oppført i applikasjonskonsollen vår.
4. KCL og KPL
Kinesis Client Library (KCL) er et bibliotek som forenkler forbruket av poster. Det er også et lag med abstraksjon over AWS SDK Java APIer for Kinesis Data Streams. Bak kulissene håndterer biblioteket belastningsbalansering i mange tilfeller, reagerer på feil i forekomsten, kontrollerer behandlede poster og reagerer på omharding.
Kinesis Producer Library (KPL) er et bibliotek som er nyttig for skriving til en Kinesis-datastrøm. Det gir også et lag med abstraksjon som sitter over AWS SDK Java APIer for Kinesis Data Streams. For bedre ytelse håndterer biblioteket automatisk batching, multi-threading og prøv på nytt logikk.
KCL og KPL har begge hovedfordelen at de er enkle å bruke, slik at vi kan fokusere på å produsere og konsumere poster.
4.1. Maven avhengigheter
De to bibliotekene kan tas med hver for seg i prosjektet vårt om nødvendig. For å inkludere KPL og KCL i vårt Maven-prosjekt, må vi oppdatere pom.xml-filen:
com.amazonaws amazon-kinesis-producer 0.13.1 com.amazonaws amazon-kinesis-client 1.11.2
4.2. Våroppsett
Den eneste vårforberedelsen vi trenger er å sørge for at vi har IAM-legitimasjonen for hånden. Verdiene for aws.access.key og aws.secret.key er definert i vår application.properties slik at vi kan lese dem med @Verdi når det trengs.
4.3. Forbruker
Først skal vi lage en klasse som implementerer IR-prosessor grensesnitt og definerer logikken vår for hvordan vi skal håndtere Kinesis datastrømsposter, som er å skrive dem ut i konsollen:
offentlig klasse IpProcessor implementerer IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (record -> System.out.println (record). () .array ()))); } @Override offentlig ugyldig nedleggelse (ShutdownInput shutdownInput) {}}
Det neste trinnet er å definere en fabrikklasse som implementerer IRecordProcessorFactory grensesnitt og returnerer en tidligere opprettet IpProcessor gjenstand:
offentlig klasse IpProcessorFactory implementerer IRecordProcessorFactory {@Override offentlig IRecordProcessor createProcessor () {returner ny IpProcessor (); }}
Og nå for siste trinn, vi bruker en Arbeider mot å definere forbrukerrørledningen. Vi trenger en KinesisClientLibConfiguration objekt som vil definere, om nødvendig, IAM Credentials og AWS Region.
Vi vil passere KinesisClientLibConfiguration, og vår IpProcessorFactory objekt, mot vårt Arbeider og start den deretter i en egen tråd. Vi holder denne logikken med å konsumere poster alltid i live ved bruk av Arbeider klasse, så vi leser kontinuerlig nye poster nå:
BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = ny KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, ny AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); endelig arbeiderarbeider = ny Worker.Builder () .recordProcessorFactory (ny IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());
4.4. Produsent
La oss nå definere KinesisProducerConfiguration objekt, og legger til IAM-legitimasjonen og AWS-regionen:
BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = ny KinesisProducerConfiguration () .setCredentialsProvider (ny AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName ()); this.kinesisProducer = ny KinesisProducer (producerConfig);
Vi inkluderer kinesisProducer objekt tidligere opprettet i en @ Planlagt jobbe og produsere poster for Kinesis-datastrømmen kontinuerlig:
IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())). For hver (oppføring -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, );
5. Spring Cloud Stream Binder Kinesis
Vi har allerede sett to biblioteker, begge opprettet utenfor vårens økosystem. Vi gjør det nå se hvordan Spring Cloud Stream Binder Kinesis kan forenkle livet vårt ytterligere mens du bygger på toppen av Spring Cloud Stream.
5.1. Maven avhengighet
Maven-avhengigheten vi trenger å definere i vår applikasjon for Spring Cloud Stream Binder Kinesis er:
org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE
5.2. Våroppsett
Når du kjører på EC2, oppdages de nødvendige AWS-egenskapene automatisk, så det er ikke nødvendig å definere dem. Siden vi kjører eksemplene våre på en lokal maskin, må vi definere vår IAM-tilgangsnøkkel, hemmelige nøkkel og region for AWS-kontoen vår. Vi har også deaktivert automatisk registrering av CloudFormation-stakknavn for applikasjonen:
cloud.aws.credentials.access-key = my-aws-access-key cloud.aws.credentials.secret-key = my-aws-secret-key cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = falsk
Spring Cloud Stream er pakket med tre grensesnitt som vi kan bruke i vår stream-binding:
- De Synke er for datainntak
- De Kilde brukes til publisering av poster
- De Prosessor er en kombinasjon av begge deler
Vi kan også definere våre egne grensesnitt hvis vi trenger det.
5.3. Forbruker
Å definere en forbruker er en todelt jobb. Først vil vi definere, i application.properties, datastrømmen som vi bruker:
spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-group spring.cloud.stream.bindings.input.content-type = text / plain
Og la oss definere en vår @Komponent klasse. Kommentaren @EnableBinding (Sink.class) vil tillate oss å lese fra Kinesis-strømmen ved hjelp av metoden kommentert med @StreamListener (Sink.INPUT):
@EnableBinding (Sink.class) offentlig klasse IpConsumer {@StreamListener (Sink.INPUT) offentlig tomrom forbruker (String ip) {System.out.println (ip); }}
5.4. Produsent
Produsenten kan også deles i to. Først må vi definere strømegenskapene våre inne application.properties:
spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain
Og så vi legger til @EnableBinding (Source.class) på en vår @Komponent og lage nye testmeldinger hvert par sekunder:
@Component @EnableBinding (Source.class) offentlig klasse IpProducer {@Autowired private Source source; @Scheduled (fixedDelay = 3000L) private void produce () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (entry -> source.output (). Send ( MessageBuilder.withPayload (entry) .build ())); }}
Det er alt vi trenger for at Spring Cloud Stream Binder Kinesis skal fungere. Vi kan bare starte søknaden nå.
6. Konklusjon
I denne artikkelen har vi sett hvordan vi kan integrere vårprosjektet med to AWS-biblioteker for samhandling med en Kinesis Data Stream. Vi har også sett hvordan du bruker Spring Cloud Stream Binder Kinesis-biblioteket for å gjøre implementeringen enda enklere.
Kildekoden for denne artikkelen finner du på Github.