Introduksjon til Apache Storm

1. Oversikt

Denne opplæringen vil være en introduksjon til Apache Storm, et distribuert sanntids beregningssystem.

Vi vil fokusere på og dekke:

  • Hva er egentlig Apache Storm og hvilke problemer det løser
  • Arkitekturen, og
  • Hvordan bruke det i et prosjekt

2. Hva er Apache Storm?

Apache Storm er gratis og åpen kildekode distribuert system for sanntidsberegninger.

Den gir feiltoleranse, skalerbarhet og garanterer databehandling, og er spesielt god til å behandle ubegrensede datastrømmer.

Noen gode brukssaker for Storm kan være behandling av kredittkortoperasjoner for svindeloppdaging eller behandling av data fra smarte hjem for å oppdage defekte sensorer.

Storm tillater integrering med forskjellige databaser og køsystemer som er tilgjengelige på markedet.

3. Maven avhengighet

Før vi bruker Apache Storm, må vi inkludere stormkjerneavhengigheten i prosjektet vårt:

 org.apache.storm storm-core 1.2.2 gitt 

Vi bør bare bruke gitt omfang hvis vi har tenkt å kjøre søknaden vår på Storm-klyngen.

For å kjøre applikasjonen lokalt, kan vi bruke en såkalt lokal modus som vil simulere Storm-klyngen i en lokal prosess, i så fall bør vi fjerne sørget for.

4. Datamodell

Apache Storms datamodell består av to elementer: tupler og strømmer.

4.1. Tuple

EN Tuple er en ordnet liste over navngitte felt med dynamiske typer. Dette betyr at vi ikke trenger å eksplisitt erklære felttypene.

Storm trenger å vite hvordan man skal serieisere alle verdiene som brukes i en tuple. Som standard kan den allerede serieisere primitive typer, Strenger og byte arrays.

Og siden Storm bruker Kryo-serialisering, må vi registrere serialisereren ved hjelp av Konfig for å bruke de tilpassede typene. Vi kan gjøre dette på en av to måter:

Først kan vi registrere klassen som skal serienummeres med sitt fulle navn:

Config config = new Config (); config.registerSerialization (User.class);

I et slikt tilfelle vil Kryo serieere klassen ved hjelp av FieldSerializer. Som standard vil dette serieisere alle ikke-forbigående felt i klassen, både private og offentlige.

Eller i stedet kan vi tilby både klassen som skal serieniseres, og den seriellisereren vi vil at Storm skal bruke for den klassen:

Config config = new Config (); config.registerSerialization (User.class, UserSerializer.class);

For å lage den tilpassede serialisereren, må vi utvide generisk klasse Serializer det har to metoder skrive og lese.

4.2. Strøm

EN Strøm er kjerneabstraksjonen i Storm-økosystemet. De Strøm er en ubegrenset sekvens av tupler.

Storms tillater behandling av flere strømmer parallelt.

Hver strøm har en ID som blir gitt og tildelt under erklæring.

5. Topologi

Logikken til sanntids Storm-applikasjonen er pakket inn i topologien. Topologien består av tuter og bolter.

5.1. Tut

Tut er kildene til bekkene. De avgir tupler til topologien.

Tuples kan leses fra forskjellige eksterne systemer som Kafka, Kestrel eller ActiveMQ.

Tut kan være pålitelig eller upålitelig. Pålitelig betyr at tuten kan svare at tupelen som ikke har blitt behandlet av Storm. Upålitelig betyr at tuten ikke svarer siden den skal bruke en fyr-og-glem-mekanisme for å avgi tuplene.

For å lage den tilpassede tuten, må vi implementere IRichSpout grensesnitt eller utvide en klasse som allerede implementerer grensesnittet, for eksempel et abstrakt BaseRichSpout klasse.

La oss lage en upålitelig tut:

offentlig klasse RandomIntSpout utvider BaseRichSpout {private Tilfeldig tilfeldig; private SpoutOutputCollector outputCollector; @ Overstyr offentlig tomrom åpent (Kartkart, TopologyContext topologyContext, SpoutOutputCollector tutOutputCollector) {random = new Random (); outputCollector = tutOutputCollector; } @ Override public void nextTuple () {Utils.sleep (1000); outputCollector.emit (nye verdier (random.nextInt (), System.currentTimeMillis ())); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nye felt ("randomInt", "tidsstempel")); }}

Vår skikk RandomIntSpout vil generere tilfeldig heltall og tidsstempel hvert sekund.

5.2. Bolt

Bolter behandler tupler i strømmen. De kan utføre forskjellige operasjoner som filtrering, aggregering eller egendefinerte funksjoner.

Noen operasjoner krever flere trinn, og derfor må vi bruke flere bolter i slike tilfeller.

Å lage skikken Bolt, må vi implementere IRichBolt eller for enklere operasjoner IBasicBolt grensesnitt.

Det er også flere hjelperklasser tilgjengelig for implementering Bolt. I dette tilfellet vil vi bruke BaseBasicBolt:

public class PrintingBolt utvider BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @Override offentlig ugyldig erklæreOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Denne skikken UtskriftBolt vil ganske enkelt skrive ut alle tuplene til konsollen.

6. Lage en enkel topologi

La oss sette disse ideene sammen til en enkel topologi. Topologien vår vil ha en tut og tre bolter.

6.1. RandomNumberSpout

I begynnelsen vil vi lage en upålitelig tut. Det vil generere tilfeldige heltall fra området (0,100) hvert sekund:

offentlig klasse RandomNumberSpout utvider BaseRichSpout {private Random random; privat SpoutOutputCollector samler; @ Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector tutOutputCollector) {random = new Random (); collector = tut OutputCollector; } @ Override public void nextTuple () {Utils.sleep (1000); int-operasjon = random.nextInt (101); lang tidsstempel = System.currentTimeMillis (); Verdier verdier = nye verdier (drift, tidsstempel); collector.emit (verdier); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("operation", "timestamp")); }}

6.2. FiltreringBolt

Deretter lager vi en bolt som filtrerer ut alle elementene med operasjon lik 0:

offentlig klasse FilteringBolt utvider BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); hvis (operasjon> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nye felt ("operasjon", "tidsstempel")); }}

6.3. AggregatingBolt

Deretter la oss lage en mer komplisert Bolt som vil samle alle positive operasjoner fra hver dag.

For dette formålet vil vi bruke en bestemt klasse opprettet spesielt for å implementere bolter som fungerer på vinduer i stedet for å operere på enkle tupler: BaseWindowedBolt.

Windows er et essensielt konsept i strømbehandling, og deler de uendelige strømmer i endelige biter. Vi kan deretter bruke beregninger på hver del. Det er vanligvis to typer vinduer:

Tidsvinduer brukes til å gruppere elementer fra en gitt tidsperiode ved hjelp av tidsstempler. Tidsvinduer kan ha et annet antall elementer.

Tellevinduer brukes til å lage vinduer med en definert størrelse. I et slikt tilfelle vil alle vinduer ha samme størrelse og vinduet vil ikke slippes ut hvis det er færre elementer enn den definerte størrelsen.

Våre AggregatingBolt vil generere summen av alle positive operasjoner fra a tidsvindu sammen med begynnelses- og slutttidstempel:

offentlig klasse AggregatingBolt utvider BaseWindowedBolt {private OutputCollector outputCollector; @ Overstyr offentlig tomrom forberede (Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector; } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute (TupleWindow tupleWindow) {List tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (dette :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operation")) .sum (); Lang begynnelseTidsstempel = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Verdier verdier = nye verdier (sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit (verdier); } privat Lang getTimestamp (Tuple tuple) {return tuple.getLongByField ("tidsstempel"); }}

Vær oppmerksom på at det i dette tilfellet er trygt å få det første elementet i listen direkte. Det er fordi hvert vindu beregnes ved hjelp av tidsstempel felt av Tuple, det må være minst ett element i hvert vindu.

6.4. FileWritingBolt

Til slutt lager vi en bolt som tar alle elementene med sumOfOperations større enn 2000, serierer du dem og skriver dem til filen:

public class FileWritingBolt utvider BaseRichBolt {public static Logger logger = LoggerFactory.getLogger (FileWritingBolt.class); privat BufferedWriter-forfatter; private String filePath; private ObjectMapper objectMapper; @ Overstyr offentlig tomromopprydding () {prøv {writer.close (); } fange (IOException e) {logger.error ("Kunne ikke lukke forfatteren!"); }} @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = new ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); prøv {writer = new BufferedWriter (new FileWriter (filePath)); } catch (IOException e) {logger.error ("Kunne ikke åpne en fil for skriving.", e); }} @Override public void execute (Tuple tuple) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); lang begynnelsestimpel = tuple.getLongByField ("begynnelsestimpel"); long endTimestamp = tuple.getLongByField ("endTimestamp"); hvis (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = new AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); prøv {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); skribent.newLine (); writer.flush (); } fange (IOException e) {logger.error ("Kunne ikke skrive data til fil.", e); }}} // offentlig konstruktør og andre metoder}

Merk at vi ikke trenger å erklære utdataene, da dette vil være den siste bolten i vår topologi

6.5. Kjører topologien

Til slutt kan vi trekke alt sammen og kjøre vår topologi:

public static void runTopology () {TopologyBuilder builder = new TopologyBuilder (); Tut tilfeldig = ny RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Boltfiltrering = ny FilteringBolt (); builder.setBolt ("filteringBolt", filtrering) .shuffleGrouping ("randomNumberSpout"); Boltaggregering = ny AggregatingBolt () .withTimestampField ("tidsstempel") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", aggregating) .shuffleGrouping ("filteringBolt"); Streng filePath = "./src/main/resources/data.txt"; Bolt-fil = ny FileWritingBolt (filePath); builder.setBolt ("fileBolt", file) .shuffleGrouping ("aggregatingBolt"); Config config = new Config (); config.setDebug (false); LocalCluster-klynge = ny LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

For å få dataene til å flyte gjennom hvert stykke i topologien, må vi indikere hvordan de skal kobles sammen. shuffleGroup tillater oss å oppgi disse dataene for filtreringBolt kommer fra randomNumberSpout.

For hver Bolt, må vi legge til shuffleGroup som definerer kilden til elementer for denne bolten. Elementkilden kan være en Tut eller en annen Bolt. Og hvis vi setter den samme kilden for mer enn en bolt, kilden vil sende ut alle elementene til hver av dem.

I dette tilfellet vil vår topologi bruke LocalCluster å kjøre jobben lokalt.

7. Konklusjon

I denne opplæringen introduserte vi Apache Storm, et distribuert sanntids beregningssystem. Vi opprettet en tut, noen bolter og trakk dem sammen til en komplett topologi.

Og som alltid kan alle kodeeksemplene finnes på GitHub.