Reaktive systemer i Java

1. Introduksjon

I denne opplæringen vil vi forstå det grunnleggende om å lage reaktive systemer i Java ved hjelp av Spring og andre verktøy og rammer.

I prosessen vil vi diskutere hvordan reaktiv programmering bare er en driver for å skape et reaktivt system. Dette vil hjelpe oss med å forstå begrunnelsen for å lage reaktive systemer og forskjellige spesifikasjoner, biblioteker og standarder det har inspirert underveis.

2. Hva er reaktive systemer?

I løpet av de siste tiårene har teknologilandskapet sett flere forstyrrelser som har ført til en fullstendig transformasjon i måten vi ser verdi på teknologi. Databehandlingsverdenen før Internett kunne aldri ha forestilt seg hvordan de vil endre vår nåværende dag.

Med rekkevidden til internett for massene og den stadig utviklende opplevelsen det lover, må applikasjonsarkitekter være på tærne for å imøtekomme deres etterspørsel.

I utgangspunktet betyr dette at vi aldri kan utforme en applikasjon slik vi pleide å gjøre tidligere. EN svært responsiv applikasjon er ikke lenger en luksus, men en nødvendighet.

Det er også i møte med tilfeldige feil og uforutsigbar belastning. Timens behov er ikke bare å få riktig resultat, men å få det raskt! Det er ganske viktig å drive de fantastiske brukeropplevelsene vi lover å levere.

Dette er det som skaper behovet for en arkitektonisk stil som kan gi oss reaktive systemer.

2.1. Reaktivt manifest

Tilbake i året 2013, et team av utviklere, ledet av Jonas Boner, kom sammen for å definere et sett med hovedprinsipper i et dokument kjent som Reactive Manifesto. Dette var det som la grunnlaget for en arkitekturstil for å skape reaktive systemer. Siden den gang har dette manifestet samlet mye interesse fra utviklermiljøet.

I utgangspunktet foreskriver dette dokumentet oppskriften på at et reaktivt system skal være fleksibelt, løst koblet og skalerbart. Dette gjør slike systemer enkle å utvikle, tolerante for feil, og viktigst av alt svært responsive, og understøttelsen for utrolige brukeropplevelser.

Så hva er denne hemmelige oppskriften? Vel, det er knapt noen hemmelighet! Manifestet definerer de grunnleggende egenskapene eller prinsippene til et reaktivt system:

  • Mottakelig: Et reaktivt system skal gi rask og konsistent responstid og dermed en jevn kvalitet på tjenesten
  • Fleksibel: Et reaktivt system skal være responsivt i tilfelle tilfeldige feil gjennom replikering og isolasjon
  • Elastisk: Et slikt system skal være responsivt under uforutsigbar arbeidsbelastning gjennom kostnadseffektiv skalerbarhet
  • Meldingsdrevet: Det bør stole på asynkron melding som går mellom systemkomponenter

Disse prinsippene høres enkle og fornuftige ut, men er ikke alltid enklere å implementere i kompleks bedriftsarkitektur. I denne opplæringen vil vi utvikle et eksempelsystem i Java med tanke på disse prinsippene!

3. Hva er reaktiv programmering?

Før vi fortsetter, er det viktig å forstå forskjellen mellom reaktiv programmering og reaktive systemer. Vi bruker begge disse begrepene ganske ofte og misforstår lett det ene for det andre. Som vi har sett tidligere, er reaktive systemer et resultat av en bestemt arkitektonisk stil.

I motsetning, reaktiv programmering er et programmeringsparadigme hvor fokus er på å utvikle asynkrone og ikke-blokkerende komponenter. Kjernen i reaktiv programmering er en datastrøm som vi kan observere og reagere på, til og med også bruke mottrykk. Dette fører til ikke-blokkerende utførelse og dermed til bedre skalerbarhet med færre utførelsestråder.

Dette betyr ikke at reaktive systemer og reaktiv programmering utelukker hverandre. Faktisk er reaktiv programmering et viktig skritt mot å realisere et reaktivt system, men det er ikke alt!

3.1. Reaktive strømmer

Reactive Streams er et fellesskapsinitiativ som startet tilbake i år 2013 til gi en standard for asynkron strømbehandling med ikke-blokkerende mottrykk. Målet her var å definere et sett med grensesnitt, metoder og protokoller som kan beskrive de nødvendige operasjonene og enhetene.

Siden har det dukket opp flere implementeringer i flere programmeringsspråk som er i samsvar med spesifikasjonen for reaktive strømmer. Disse inkluderer Akka Streams, Ratpack og Vert.x for å nevne noen.

3.2. Reaktive biblioteker for Java

Et av de første målene bak reaktive strømmer var å til slutt bli inkludert som et offisielt Java-standardbibliotek. Som et resultat er spesifikasjonen for reaktive strømmer semantisk ekvivalent med Java Flow-biblioteket, introdusert i Java 9.

Bortsett fra det er det noen populære valg å implementere reaktiv programmering i Java:

  • Reactive Extensions: Populært kjent som ReactiveX, de gir API for asynkron programmering med observerbare strømmer. Disse er tilgjengelige for flere programmeringsspråk og plattformer, inkludert Java der det er kjent som RxJava
  • Prosjektreaktor: Dette er et annet reaktivt bibliotek, begrunnelse basert på spesifikasjonen for reaktive strømmer, rettet mot å bygge ikke-applikasjoner på JVM. Det skjer også å være grunnlaget for den reaktive stakken i vårens økosystem

4. En enkel applikasjon

For formålet med denne veiledningen vil vi utvikle en enkel applikasjon basert på mikrotjenestearkitektur med minimal frontend. Applikasjonsarkitekturen skal ha nok elementer til å lage et reaktivt system.

For vår applikasjon vil vi vedta end-to-end reaktiv programmering og andre mønstre og verktøy for å oppnå de grunnleggende egenskapene til et reaktivt system.

4.1. Arkitektur

Vi begynner med å definere en enkel applikasjonsarkitektur som ikke nødvendigvis viser egenskapene til reaktive systemer. Derfra vil vi gjøre de nødvendige endringene for å oppnå disse egenskapene en etter en.

La oss først begynne med å definere en enkel arkitektur:

Dette er en ganske enkel arkitektur som har en rekke mikrotjenester for å gjøre det lettere å bruke en handel der vi kan bestille. Det har også en frontend for brukeropplevelse, og all kommunikasjon skjer som REST over HTTP. Videre administrerer hver mikrotjeneste dataene sine i individuelle databaser, en praksis kjent som database per tjeneste.

Vi fortsetter og lager denne enkle applikasjonen i de følgende underavsnittene. Dette blir vårt base for å forstå feilene i denne arkitekturen og måter og midler for å vedta prinsipper og praksis slik at vi kan forvandle dette til et reaktivt system.

4.3. Lager Microservice

Inventory microservice vil være ansvarlig for å administrere en liste over produkter og deres nåværende lager. Det vil også tillate endring av lageret når ordrer behandles. Vi bruker Spring Boot med MongoDB for å utvikle denne tjenesten.

La oss begynne med å definere en kontroller for å avsløre noen endepunkter:

@GetMapping offentlig liste getAllProducts () {return productService.getProducts (); } @PostMapping public Order processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping public Order revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

og en tjeneste for å kapsle inn forretningslogikken vår:

@Transactional public Order handleOrder (Order order) {order.getLineItems () .forEach (l -> {Product> p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Kunne ikke finne produktet: "+ l.getProductId ())); hvis (p.getStock ()> = l.getQuantity ()) {p.setStock (p.getStock () - l.getQuantity ()); productRepository.save ( p);} annet {kast ny RuntimeException ("Produktet er utsolgt:" + l.getProductId ());}}); returner order.setOrderStatus (OrderStatus.SUCCESS); } @Transactional public Order revertOrder (Order order) {order.getLineItems () .forEach (l -> {Product p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Kunne ikke finne produktet: "+ l.getProductId ())); p.setStock (p.getStock () + l.getQuantity ()); productRepository.save (p);}); returner order.setOrderStatus (OrderStatus.SUCCESS); }

Merk at vi er vedvarende enhetene i en transaksjon, som sikrer at ingen inkonsekvente tilstander resulterer i tilfelle unntak.

Bortsett fra disse, må vi også definere domenenhetene, lagringsgrensesnittet og en rekke konfigurasjonsklasser som er nødvendige for at alt skal fungere skikkelig.

Men siden disse for det meste er kokeplate, vil vi unngå å gå gjennom dem, og de kan refereres til i GitHub-depotet som er gitt i den siste delen av denne artikkelen.

4.4. Shipping Microservice

Forsendelsesmikrotjenesten vil heller ikke være veldig forskjellig. Dette vil bli ansvarlig for å sjekke om en forsendelse kan genereres for bestillingen og opprett en hvis mulig.

Som før vil vi definere en kontroller for å avsløre våre endepunkter, faktisk bare et enkelt endepunkt:

@PostMapping offentlig bestillingsprosess (@RequestBody bestillingsordre) {retur shippingService.handleOrder (ordre); }

og en tjeneste for å kapsle inn forretningslogikken knyttet til bestillingsforsendelse:

public Order handleOrder (Order order) {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime.now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now () .plusDays (1); } annet {kast ny RuntimeException ("Den nåværende tiden er utenfor grensene for å bestille."); } shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate)); returner order.setShippingDate (shippingDate) .setOrderStatus (OrderStatus.SUCCESS); }

Vår enkle frakttjeneste sjekker bare det gyldige tidsvinduet for å bestille. Vi unngår å diskutere resten av kjelekoden som før.

4.5. Bestill Microservice

Til slutt vil vi definere en bestillingsmikrotjeneste som vil være ansvarlig for å skape en ny ordre bortsett fra andre ting. Interessant, det vil også spille som en orkestrator-tjeneste der den vil kommunisere med varetjenesten og frakttjenesten for bestillingen.

La oss definere kontrolleren vår med de nødvendige sluttpunktene:

@PostMapping offentlig ordre opprette (@RequestBody ordreordre) {Bestillingsbehandlet bestilling = ordreservice.opprett bestilling (ordre); hvis (OrderStatus.FAILURE.equals (prosessertOrder.getOrderStatus ())) {kast nytt RuntimeException ("Behandling mislyktes, prøv igjen senere."); } returnert behandletBestilling; } @GetMapping offentlig liste getAll () {return orderService.getOrders (); }

Og en tjeneste for å kapsle inn forretningslogikken knyttet til ordrer:

public Order createOrder (Order order) {boolean suksess = true; Ordre lagretOrder = orderRepository.save (ordre); Bestill inventarResponse = null; prøv {inventoryResponse = restTemplate.postForObject (inventoryServiceUrl, order, Order.class); } fange (Unntak ex) {suksess = usann; } Bestill shippingResponse = null; prøv {shippingResponse = restTemplate.postForObject (shippingServiceUrl, order, Order.class); } fange (Unntak ex) {suksess = usann; HttpEntity deleteRequest = ny HttpEntity (rekkefølge); ResponseEntity deleteResponse = restTemplate.exchange (lagerServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } hvis (suksess) {savedOrder.setOrderStatus (OrderStatus.SUCCESS); savedOrder.setShippingDate (shippingResponse.getShippingDate ()); } annet {savedOrder.setOrderStatus (OrderStatus.FAILURE); } returner orderRepository.save (savedOrder); } offentlig liste getOrders () {return orderRepository.findAll (); }

Håndteringen av bestillinger der vi orkestrerer anrop til varelager og leveringstjenester er langt fra ideell. Distribuert transaksjoner med flere mikrotjenester er et komplekst tema i seg selv og utenfor omfanget av denne opplæringen.

Imidlertid vil vi se senere i denne veiledningen hvordan et reaktivt system til en viss grad kan unngå behovet for distribuerte transaksjoner.

Som før vil vi ikke gå gjennom resten av kjelekoden. Imidlertid kan dette refereres til i GitHub-repoen.

4.6. Front-end

La oss også legge til et brukergrensesnitt for å gjøre diskusjonen komplett. Brukergrensesnittet vil være basert på Angular og vil være en enkel applikasjon på én side.

Vi må lage en enkel komponent i Angular for å håndtere opprette og hente ordrer. Av spesiell betydning er den delen der vi kaller API-en vår for å opprette ordren:

createOrder () {let headers = new HttpHeaders ({'Content-Type': 'application / json'}); let options = {headers: headers} this.http.post ('// localhost: 8080 / api / orders', this.form.value, options). abonner ((respons) => {this.response = respons}, (feil) => {this.error = error})}

Ovennevnte kodebit forventer at bestillingsdata blir fanget i et skjema og tilgjengelig innenfor komponentens omfang. Angular tilbyr fantastisk støtte for å lage enkle til komplekse former ved hjelp av reaktive og maldrevne former.

Også viktig er den delen hvor vi får tidligere opprettet bestillinger:

getOrders () {this.previousOrders = this.http.get ('' // localhost: 8080 / api / orders '')}

Vær oppmerksom på at Angular HTTP-modulen er asynkron i naturen og returnerer dermed RxJS Observerbars. Vi kan håndtere responsen etter vårt syn ved å føre dem gjennom et asynkronrør:

Dine bestillinger så langt:

  • Bestillings-ID: {{order.id}}, Bestillingsstatus: {{order.orderStatus}}, Bestillingsmelding: {{order.responseMessage}}

Selvfølgelig vil Angular kreve maler, stiler og konfigurasjoner for å fungere, men disse kan refereres til i GitHub-depotet. Vær oppmerksom på at vi har samlet alt i en enkelt komponent her, som ideelt sett ikke er noe vi bør gjøre.

Men for denne opplæringen er disse bekymringene ikke i omfang.

4.7. Distribuere applikasjonen

Nå som vi har opprettet alle individuelle deler av applikasjonen, hvordan skal vi gå frem for å distribuere dem? Vi kan alltid gjøre dette manuelt. Men vi bør være forsiktige med at det snart kan bli kjedelig.

For denne opplæringen bruker vi Docker Compose til bygge og distribuere applikasjonen vår på en Docker Machine. Dette vil kreve at vi legger til en standard Dockerfil i hver tjeneste og oppretter en Docker Compose-fil for hele applikasjonen.

La oss se hvordan dette docker-compose.yml filen ser ut:

versjon: '3' tjenester: frontend: build: ./frontend porter: - "80:80" ordretjeneste: build: ./ordre- service porter: - "8080: 8080" varetjeneste: build: ./inventory -service porter: - "8081: 8081" shipping-service: build: ./shipping-service porter: - "8082: 8082"

Dette er en ganske standard definisjon av tjenester i Docker Compose og krever ingen spesiell oppmerksomhet.

4.8. Problemer med denne arkitekturen

Nå som vi har en enkel applikasjon på plass med flere tjenester som interagerer med hverandre, kan vi diskutere problemene i denne arkitekturen. Det er det vi vil prøve å ta opp i de følgende avsnittene og til slutt komme til staten der vi ville ha forvandlet søknaden vår til et reaktivt system!

Selv om dette programmet er langt fra en programvare av produksjonsgrad, og det er flere problemer, vil vi fokusere på problemene som gjelder motivasjonene for reaktive systemer:

  • Feil i verken lagertjeneste eller frakttjeneste kan ha en kaskadeffekt
  • Samtalene til eksterne systemer og databaser er blokkerende
  • Distribusjonen kan ikke håndtere feil og svingende belastninger automatisk

5. Reaktiv programmering

Blokkerer ofte anrop i et hvilket som helst program føre til at kritiske ressurser bare venter på at ting skal skje. Disse inkluderer databasesamtaler, samtaler til webtjenester og filsystemanrop. Hvis vi kan frigjøre gjennomføringstråder fra denne ventetiden og tilby en mekanisme for å sirkle tilbake når resultatene er tilgjengelige, vil det gi mye bedre ressursutnyttelse.

Dette er hva adopsjonen av det reaktive programmeringsparadigmet gjør for oss. Selv om det er mulig å bytte til et reaktivt bibliotek for mange av disse samtalene, er det kanskje ikke mulig for alt. For oss, heldigvis, gjør Spring det mye enklere å bruke reaktiv programmering med MongoDB og REST APIer:

Spring Data Mongo har støtte for reaktiv tilgang gjennom MongoDB Reactive Streams Java Driver. Det gir ReactiveMongoTemplate og ReactiveMongoRepository, som begge har omfattende kartleggingsfunksjonalitet.

Spring WebFlux gir nettstrukturen for reaktiv stabel for Spring, som muliggjør ikke-blokkerende kode og mottrykk for reaktive strømmer. Den utnytter reaktoren som sitt reaktive bibliotek. Videre gir det WebClient for å utføre HTTP-forespørsler med mottrykk fra Reactive Streams. Den bruker Reactor Netty som HTTP-klientbibliotek.

5.1. Lager Service

Vi begynner med å endre sluttpunktene våre for å gi ut reaktive utgivere:

@GetMapping public Flux getAllProducts () {return productService.getProducts (); }
@PostMapping offentlig Mono processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping offentlig Mono revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

Åpenbart må vi også gjøre nødvendige endringer i tjenesten:

@Transactional public Mono handleOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order. getLineItems (). stream () .filter (l -> l.getProductId (). er lik (p.getId ())) .findAny (). get () .getQuantity (); hvis (p.getStock ()> = q) {p.setStock (p.getStock () - q); return productRepository.save (p);} else {return Mono.error (new RuntimeException ("Produktet er utsolgt:" + p.getId ()) );}}). deretter (Mono.just (order.setOrderStatus ("SUKSESS"))); } @Transactional public Mono revertOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order .getLineItems (). stream () .filter (l -> l.getProductId (). tilsvarer (p.getId ())) .findAny (). get () .getQuantity (); p.setStock (p.getStock ( ) + q); returner produktRepository.save (p);}). deretter (Mono.just (order.setOrderStatus ("SUCCESS"))); }

5.2. Frakttjeneste

På samme måte endrer vi endepunktet for frakttjenesten vår:

@PostMapping offentlig monoprosess (@RequestBody Bestill ordre) {retur shippingService.handleOrder (ordre); }

Og tilsvarende endringer i tjenesten for å utnytte reaktiv programmering:

offentlig monohåndtakOrder (bestillingsordre) {retur Mono.just (ordre) .flatMap (o -> {LocalDate shippingDate = null; hvis (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime .now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now (). plusDays (1);} ellers {return Mono.error (new RuntimeException ("The current time is off grensene for å legge inn bestilling. "));} return shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate));}) .map (s -> order.setShippingDate (s. getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS)); }

5.3. Bestill service

Vi må gjøre lignende endringer i endepunktene til ordretjenesten:

@PostMapping offentlig mono opprette (@RequestBody Order order) {return orderService.createOrder (order) .flatMap (o -> {if (OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return Mono.error (new RuntimeException ( "Behandling av ordre mislyktes, prøv igjen senere." + O.getResponseMessage ()));} ellers {return Mono.just (o);}}); } @GetMapping public Flux getAll () {return orderService.getOrders (); }

Endringene i tjenesten vil være mer involvert, ettersom vi må bruke våren WebClient for å påkalle lagrene og sende reaktive endepunkter:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .flatMap (o -> {return webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();}) .onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return webClient.method (HttpMethod.POST) .uri (shippingServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();} else { returner Mono.just (o);}}) .onErrorResume (err -> {return webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (order)) .hent () .bodyToMono (Order .class) .map (o -> o.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .map (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ( ))) {return order.setShippingDate (o.getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS);} else {return order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (o.getResponseMessage ()); }}) .flatMap (orderRepository :: lagre); } public Flux getOrders () {return orderRepository.findAll (); }

Dette slags orkestrering med reaktive API-er er ingen enkel øvelse og ofte feilutsatt så vel som vanskelig å feilsøke. Vi får se hvordan dette kan forenkles i neste avsnitt.

5.4. Front-end

Nå som API-ene våre er i stand til å streame hendelser når de skjer, er det ganske naturlig at vi også kan utnytte det i vår front-end. Heldigvis støtter Angular EventSource, grensesnittet for server-sendte hendelser.

La oss se hvordan vi kan trekke og behandle alle våre tidligere bestillinger som en strøm av hendelser:

getOrderStream () {return Observable.create ((observer) => {let eventSource = new EventSource ('// localhost: 8080 / api / orders') eventSource.onmessage = (event) => {let json = JSON.parse ( event.data) this.orders.push (json) this._zone.run (() => {observer.next (this.orders)})} eventSource.onerror = (error) => {if (eventSource.readyState = == 0) {eventSource.close () this._zone.run (() => {observer.complete ()})} annet {this._zone.run (() => {observer.error ('EventSource error: '+ feil)})}}})}

6. Meldingsdrevet arkitektur

Det første problemet vi skal løse er relatert til service-til-tjeneste-kommunikasjon. Akkurat nå, disse kommunikasjonene er synkrone, noe som gir flere problemer. Disse inkluderer kaskadefeil, kompleks orkestrering og distribuerte transaksjoner for å nevne noen.

En åpenbar måte å løse dette problemet på er å gjøre disse kommunikasjonene asynkrone. EN meldingsmegler for å legge til rette for all tjeneste-til-tjeneste-kommunikasjon kan gjøre susen for oss. Vi bruker Kafka som vår meldingsmegler og Spring for Kafka for å produsere og konsumere meldinger:

Vi bruker et enkelt emne for å produsere og konsumere bestillingsmeldinger med forskjellige ordrestatuser for tjenester å reagere på.

La oss se hvordan hver tjeneste må endres.

6.1. Lager Service

La oss begynne med å definere meldingsprodusenten for vår varetjeneste:

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage (Order order) {this.kafkaTemplate.send ("orders", order); }

Deretter må vi definere en meldingsforbruker for varetjeneste for å reagere på forskjellige meldinger om emnet:

@KafkaListener (topics = "orders", groupId = "lager") offentlig ugyldig forbruker (Order order) kaster IOException {if (OrderStatus.RESERVE_INVENTORY.equals (order.getOrderStatus ())) {productService.handleOrder (order) .doOnSuccess () o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_SUCCESS));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_FAILURE) (setResponse)) }).abonnere(); } annet hvis (OrderStatus.REVERT_INVENTORY.equals (order.getOrderStatus ())) {productService.revertOrder (order) .doOnSuccess (o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_Server)) e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage (e.getMessage ()));}). subscribe (); }}

Dette betyr også at vi trygt kan slippe noen av de overflødige endepunktene fra kontrolleren vår nå. Disse endringene er tilstrekkelige for å oppnå asynkron kommunikasjon i applikasjonen vår.

6.2. Frakttjeneste

Endringene i frakttjenesten er relativt lik det vi gjorde tidligere med varetjenesten. Meldingsprodusenten er den samme, og meldingsforbrukeren er spesifikk for fraktlogikken:

@KafkaListener (topics = "orders", groupId = "shipping") offentlig ugyldig forbruker (Order order) kaster IOException {if (OrderStatus.PREPARE_SHIPPING.equals (order.getOrderStatus ())) {shippingService.handleOrder (order) .doOnSuccess () o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING_SUCCESS) .setShippingDate (o.getShippingDate ()))}} .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatUSSett) (e.getMessage ()))}}. abonner (); }}

Vi kan trygt slippe alle endepunktene i kontrolleren vår nå, ettersom vi ikke lenger trenger dem.

6.3. Bestill service

Endringene i bestillingstjenesten vil være litt mer involvert, ettersom det var her vi gjorde hele orkestrasjonen tidligere.

Likevel forblir meldingsprodusenten uendret, og meldingsforbruker tar på seg tjenestespesifikk logikk:

@KafkaListener (topics = "orders", groupId = "orders") offentlig ugyldig forbruker (Order order) kaster IOException {if (OrderStatus.INITIATION_SUCCESS.equals (order.getOrderStatus ())) {orderRepository.findById (order.getId () ) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.RESERVE_INVENTORY)); returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) : lagre). abonner (); } annet hvis ("INVENTORY-SUCCESS" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.PREPARE_SHIPPING)) ; returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } annet hvis ("SHIPPING-FAILURE" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.REVERT_INVENTORY)) ; returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } annet {orderRepository.findById (order.getId ()) .map (o -> {return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save ) .abonnere(); }}

De forbruker her reagerer bare på bestillingsmeldinger med forskjellige ordrestatuser. Dette er det som gir oss koreografien mellom forskjellige tjenester.

Til slutt må bestillingstjenesten vår også endres for å støtte denne koreografien:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.INITIATION_SUCCESS)); return o;}). onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (orderRepository :: save); }

Merk at dette er langt enklere enn tjenesten vi måtte skrive med reaktive endepunkter i den siste delen. Asynkron koreografi resulterer ofte i langt enklere kode, selv om det koster endelig konsistens og kompleks feilsøking og overvåking. Som vi kanskje gjetter, vil frontenderen vår ikke lenger få den endelige statusen for bestillingen umiddelbart.

7. Containerorkestreringstjeneste

Den siste brikken i puslespillet som vi ønsker å løse, er relatert til distribusjon.

Det vi ønsker i applikasjonen er god overflødighet og en tendens til å skalere opp eller ned, avhengig av behovet automatisk.

Vi har allerede oppnådd containerisering av tjenester gjennom Docker og administrerer avhengigheter mellom dem gjennom Docker Compose. Selv om dette er fantastiske verktøy i seg selv, hjelper de oss ikke med å oppnå det vi ønsker.

Derfor, vi trenger en container orkestreringstjeneste som kan ivareta redundans og skalerbarhet i applikasjonen vår. Mens det er flere alternativer, inkluderer en av de populære Kubernetes. Kubernetes gir oss en skyleverandør-agnostisk måte å oppnå svært skalerbar distribusjon av containeriserte arbeidsmengder.

Kubernetes pakker containere som Docker inn i Pods, som er den minste distribusjonsenheten. Videre kan vi bruke Deployment for å beskrive ønsket tilstand erklærende.

Distribusjon skaper ReplicaSets, som internt er ansvarlig for å ta opp belgene. Vi kan beskrive et minimum antall identiske bøtter som skal kjøre når som helst. Dette gir redundans og dermed høy tilgjengelighet.

La oss se hvordan vi kan definere en Kubernetes-distribusjon for applikasjonene våre:

apiVersion: apps / v1 type: Distribusjonsmetadata: navn: lager-distribusjon spesifikasjon: replikaer: 3 velger: matchLabels: navn: lager-distribusjonsmal: metadata: etiketter: navn: inventar-distribusjon spesifikasjon: containere: - navn: lagerbilde: inventar-service-async: siste porter: - containerPort: 8081 --- apiVersion: apps / v1 type: Implementeringsmetadata: navn: shipping-distribusjon spec: replikaer: 3 velger: matchLabels: navn: shipping-distribusjonsmal: metadata: labels : navn: shipping-deployment spec: containere: - navn: shipping image: shipping-service-async: siste porter: - containerPort: 8082 --- apiVersion: apps / v1 type: Implementeringsmetadata: navn: order-deployment spec: replikaer : 3 velger: matchLabels: navn: ordre-distribusjonsmal: metadata: etiketter: navn: ordre-distribusjon spesifikasjon: containere: - navn: ordrebilde: ordre-service-async: siste porter: - containerPort: 8080

Her erklærer vi vår distribusjon for å opprettholde tre identiske kopier av pods når som helst. Selv om dette er en god måte å legge til overflødighet, kan det hende det ikke er tilstrekkelig for varierende belastning. Kubernetes tilbyr en annen ressurs kjent som Horizontal Pod Autoscaler som kan skalere antall pods i en distribusjon basert på observerte beregninger som CPU-utnyttelse.

Vær oppmerksom på at vi nettopp har dekket skalerbarhetsaspektene til applikasjonen som er vert i en Kubernetes-klynge. Dette betyr ikke nødvendigvis at selve den underliggende klyngen er skalerbar. Å lage en Kubernetes-klynge med høy tilgjengelighet er en ikke-triviell oppgave og utenfor omfanget av denne opplæringen.

8. Resulterende reaktivt system

Nå som vi har gjort flere forbedringer i arkitekturen vår, er det kanskje på tide å evaluere dette mot definisjonen av et reaktivt system. Vi holder evalueringen mot de fire egenskapene til et reaktivt system vi diskuterte tidligere i opplæringen:

  • Mottakelig: Vedtakelsen av det reaktive programmeringsparadigmet bør hjelpe oss med å oppnå ikke-blokkering fra ende til ende og dermed en responsiv applikasjon
  • Fleksibel: Kubernetes distribusjon med ReplicaSet av ønsket antall pods skal gi motstandsdyktighet mot tilfeldige feil
  • Elastisk: Kubernetes klynge og ressurser bør gi oss den nødvendige støtten for å være elastiske i møte med uforutsigbare belastninger
  • Meldingsdrevet: Å ha all service-til-tjenestekommunikasjon håndtert asynkront gjennom en Kafka-megler, bør hjelpe oss her

Selv om dette ser ganske lovende ut, er det langt fra over. For å være ærlig, søken etter et virkelig reaktivt system bør være en kontinuerlig øvelse av forbedringer. Vi kan aldri forebygge alt som kan mislykkes i en svært kompleks infrastruktur, der applikasjonen vår bare er en liten del.

Et reaktivt system vil således krever pålitelighet fra alle deler som utgjør helheten. Rett fra det fysiske nettverket til infrastrukturtjenester som DNS, bør alle falle i kø for å hjelpe oss med å nå det endelige målet.

Ofte er det kanskje ikke mulig for oss å administrere og gi de nødvendige garantiene for alle disse delene. Og det er her en administrert skyinfrastruktur hjelper til med å lindre smertene våre. Vi kan velge mellom en rekke tjenester som IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) og PaaS (Platform-as-a-Service) for å delegere ansvaret til eksterne parter. Dette gir oss ansvaret for søknaden vår så langt som mulig.

9. Konklusjon

I denne opplæringen gikk vi gjennom det grunnleggende om reaktive systemer, og hvordan sammenligner det med reaktiv programmering. Vi opprettet en enkel applikasjon med flere mikrotjenester og fremhevet problemene vi har tenkt å løse med et reaktivt system.

Videre gikk vi videre og introduserte reaktiv programmering, meldingsbasert arkitektur og containerorkestreringstjeneste i arkitekturen for å realisere et reaktivt system.

Til slutt diskuterte vi den resulterende arkitekturen og hvordan den forblir en reise mot det reaktive systemet! Denne opplæringen introduserer oss ikke for alle verktøy, rammer eller mønstre som kan hjelpe oss med å skape et reaktivt system, men det introduserer oss for reisen.

Som vanlig kan kildekoden for denne artikkelen finnes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found