Introduksjon til Netflix Mantis

1. Oversikt

I denne artikkelen tar vi en titt på Mantis-plattformen utviklet av Netflix.

Vi vil utforske de viktigste Mantis-konseptene ved å opprette, kjøre og undersøke en strømbehandlingsjobb.

2. Hva er Mantis?

Mantis er en plattform for å bygge strømbehandlingsapplikasjoner (arbeidsplasser). Det gir en enkel måte å administrere distribusjon og livssyklus for jobber. Dessuten, det muliggjør ressurstildeling, oppdagelse og kommunikasjon mellom disse jobbene.

Derfor kan utviklere fokusere på faktisk forretningslogikk, samtidig som de har støtte fra en robust og skalerbar plattform for å kjøre applikasjoner med høyt volum, lav ventetid og ikke-blokkering.

En Mantis-jobb består av tre forskjellige deler:

  • de kilde, ansvarlig for å hente dataene fra en ekstern kilde
  • en eller fler trinn, ansvarlig for behandling av innkommende hendelsesstrømmer
  • og en synke som samler de behandlede dataene

La oss nå utforske hver av dem.

3. Oppsett og avhengigheter

La oss starte med å legge til mantis-kjøretid og jackson-databind avhengigheter:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

Nå, for å sette opp jobbens datakilde, la oss implementere Mantis Kilde grensesnitt:

offentlig klasse RandomLogSource implementerer kilde {@Override public Observable ring (Kontekstkontekst, Indeksindeks) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } privat streng createRandomLogEvent (langt kryss) {// generere en tilfeldig loggoppføringsstreng ...}}

Som vi kan se, genererer det ganske enkelt tilfeldige loggoppføringer flere ganger per sekund.

4. Vår første jobb

La oss nå lage en Mantis-jobb som ganske enkelt samler logghendelser fra vår RandomLogSource. Senere vil vi legge til gruppe- og aggregeringstransformasjoner for et mer komplekst og interessant resultat.

Til å begynne med, la oss lage en LogEvent enhet:

offentlig klasse LogEvent implementerer JsonType {private Long index; privat strengnivå; privat strengmelding; // ...}

La oss så legge til vår TransformLogStage.

Det er et enkelt trinn som implementerer ScalarComputation-grensesnittet og deler en loggoppføring for å bygge en LogEvent. Det filtrerer også ut eventuelle feilformaterte strenger:

offentlig klasse TransformLogStage implementerer ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). kart (LogEvent :: ny); }}

4.1. Kjører jobben

På dette tidspunktet har vi nok byggesteiner til å sette sammen vår Mantis-jobb:

offentlig klasse LogCollectingJob utvider MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), new ScalarToScalar.Config ()) .sink (Sinks.eagerSub. LogEvent :: toJsonString))) .metadata (ny Metadata.Builder (). Build ()) .create (); }}

La oss se nærmere på jobben vår.

Som vi kan se, strekker det seg MantisJobProvider. Først henter den data fra vår RandomLogSource og bruker TransformLogStage til de hentede dataene. Til slutt sender den de behandlede dataene til den innebygde vasken som ivrig abonnerer på og leverer data over SSE.

La oss nå konfigurere jobben vår til å utføre lokalt ved oppstart:

@SpringBootApplication offentlig klasse MantisApplication implementerer CommandLineRunner {// ... @Override public void run (String ... args) {LocalJobExecutorNetworked.execute (new LogCollectingJob (). GetJobInstance ()); }}

La oss kjøre applikasjonen. Vi får se en loggmelding som:

... Serverer moderne HTTP SSE-server vask på port: 86XX

La oss nå koble til vasken ved hjelp av krølle:

$ curl localhost: 86XX data: {"index": 86, "level": "WARN", "message": "login login"} data: {"index": 87, "level": "ERROR", "message ":" bruker opprettet "} data: {" index ": 88," level ":" INFO "," message ":" bruker opprettet "} data: {" index ": 89," level ":" INFO ", "message": "påloggingsforsøk"} data: {"index": 90, "level": "INFO", "message": "bruker opprettet"} data: {"index": 91, "level": "FEIL "," message ":" bruker opprettet "} data: {" index ": 92," level ":" WARN "," message ":" login login "} data: {" index ": 93," level ": "INFO", "message": "bruker opprettet"} ...

4.2. Konfigurere vasken

Så langt har vi brukt den innebygde vasken til å samle inn behandlede data. La oss se om vi kan legge til mer fleksibilitet i vårt scenario ved å tilby en tilpasset vask.

Hva om vi for eksempel vil filtrere logger etter beskjed?

La oss lage en LogSink som implementerer Synke grensesnitt:

public class LogSink implementerer Sink {@Override public void call (Context context, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicateM (filterBy). ; logEventObservable.subscribe (); sink.call (kontekst, portRequest, logEventObservable); } private Predicate filterByLogMessage () {return new Predicate ("filter by message", parameters -> {if (parameters! = null && parameters.containsKey ("filter")) {return logEvent -> logEvent.getMessage (). inneholder ( parameters.get ("filter"). get (0));} return logEvent -> true;}); }}

I denne vaskeimplementeringen konfigurerte vi et predikat som bruker filter parameter for å bare hente logger som inneholder tekstsettet i filter parameter:

$ curl localhost: 8874? filter = påloggingsdata: {"index": 93, "level": "FEIL", "message": "login login"} data: {"index": 95, "level": "INFO "," message ":" innloggingsforsøk "} data: {" index ": 97," level ":" FEIL "," message ":" påloggingsforsøk "} ...

Merk Mantis tilbyr også et kraftig spørringsspråk, MQL, som kan brukes til å spørre, transformere og analysere strømdata på en SQL-måte.

5. Scenekjetting

La oss anta at vi er interessert i å vite hvor mange FEIL, VARSLE, eller INFO loggoppføringer vi har i et gitt tidsintervall. For dette legger vi til to trinn til jobben vår og kjeder dem sammen.

5.1. Gruppering

For det første, la oss lage en GroupLogStage.

Dette stadiet er en ToGroupComputation implementering som mottar en LogEvent streame data fra det eksisterende TransformLogStage. Etter det grupperer den oppføringer etter loggenivå og sender dem til neste trinn:

offentlig klasse GroupLogStage implementerer ToGroupComputation {@ Override public Observable call (Context context, Observable logEvent) {return logEvent.map (log -> new MantisGroup (log.getLevel (), log)); } public static ScalarToGroup.Config config () {return new ScalarToGroup.Config () .description ("Group event data by level") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

Vi har også opprettet en egendefinert scenekonfigurasjon ved å gi en beskrivelse, kodeken som skal brukes til å serialisere utdataene, og tillot dette trinnets anropsmetode å kjøre samtidig ved å bruke concurrentInput ().

En ting å merke seg er at dette stadiet er horisontalt skalerbart. Det betyr at vi kan kjøre så mange tilfeller av dette stadiet som nødvendig. Også verdt å nevne når du er distribuert i en Mantis-klynge, denne fasen sender data til neste trinn, slik at alle hendelser som tilhører en bestemt gruppe, vil lande på samme arbeider i neste trinn.

5.2. Aggregerende

Før vi går videre og lager neste trinn, la oss først legge til en LogAggregate enhet:

offentlig klasse LogAggregate implementerer JsonType {private final Integer count; privat final Strengnivå; }

La oss nå lage den siste fasen i kjeden.

Dette trinnet implementeres GroupToScalarComputation og forvandler en strøm av logggrupper til en skalar LogAggregate. Den gjør dette ved å telle hvor mange ganger hver logg vises i strømmen. I tillegg har den også en LogAggregationDuration parameter, som kan brukes til å kontrollere størrelsen på aggregeringsvinduet:

offentlig klasse CountLogStage implementerer GroupToScalarComputation {privat int varighet; @ Override public void init (Context context) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @ Override public Observable call (Context context, Observable mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> new LogAggregate (count, group.getKey ())))); } offentlig statisk GroupToScalar.Config config () {returner ny GroupToScalar.Config () .description ("sum events for a log level") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } offentlig statisk liste getParameters () {Liste params = ny ArrayList (); params.add (ny IntParameter () .name ("LogAggregationDuration"). beskrivelse ("vindusstørrelse for aggregering i millisekunder") .validator (Validators.range (100, 10000)) .defaultValue (5000) .build ()); returparametre; }}

5.3. Konfigurer og kjør jobben

Det eneste du må gjøre nå er å konfigurere jobben vår:

offentlig klasse LogAggregationJob utvider MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) .stage (new GroupLogStage (), .con. )) .stage (new CountLogStage (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (new Metadata.Builder (). build ()) .create (); }}

Så snart vi kjører applikasjonen og utfører den nye jobben vår, kan vi se loggantallene blir hentet med noen sekunders mellomrom:

$ curl localhost: 8133 data: {"count": 3, "level": "ERROR"} data: {"count": 13, "level": "INFO"} data: {"count": 4, "level ":" WARN "} data: {" count ": 8," level ":" FEIL "} data: {" count ": 5," level ":" INFO "} data: {" count ": 7," level ":" WARN "} ...

6. Konklusjon

For å oppsummere, i denne artikkelen har vi sett hva Netflix Mantis er og hva den kan brukes til. Videre så vi på hovedkonseptene, brukte dem til å bygge jobber og utforsket tilpassede konfigurasjoner for forskjellige scenarier.

Som alltid er den komplette koden tilgjengelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found