MQTT-klient i Java

1. Oversikt

I denne veiledningen vil vi se hvordan vi kan legge til MQTT-meldinger i et Java-prosjekt ved hjelp av bibliotekene som tilbys av Eclipse Paho-prosjektet.

2. MQTT Primer

MQTT (MQ Telemetry Transport) er en meldingsprotokoll som ble opprettet for å imøtekomme behovet for en enkel og lett metode for å overføre data til / fra enheter med lav effekt, slik som de som brukes i industrielle applikasjoner.

Med den økte populariteten til IoT (Internet of Things) -enheter, har MQTT sett en økt bruk, noe som har ført til standardisering av OASIS og ISO.

Protokollen støtter et enkelt meldingsmønster, nemlig Publish-Subscribe-mønsteret: hver melding sendt av en klient inneholder et tilknyttet “emne” som brukes av megleren til å dirigere den til abonnerte klienter. Emnenavn kan være enkle strenger som “oljetemp”Eller en banelignende streng”motor / 1 / o / min“.

For å motta meldinger abonnerer en klient på ett eller flere emner ved å bruke det eksakte navnet eller en streng som inneholder et av de støttede jokertegnene ("#" for emner på flere nivåer og "+" for enkeltnivå ").

3. Prosjektoppsett

For å inkludere Paho-biblioteket i et Maven-prosjekt, må vi legge til følgende avhengighet:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Den siste versjonen av Eclipse Paho Java-biblioteksmodulen kan lastes ned fra Maven Central.

4. Klientoppsett

Når du bruker Paho-biblioteket, er det første vi må gjøre for å sende og / eller motta meldinger fra en MQTT-megler å få en implementering av IMqttClient grensesnitt. Dette grensesnittet inneholder alle metodene som kreves av et program for å opprette en forbindelse til serveren, sende og motta meldinger.

Paho kommer ut av esken med to implementeringer av dette grensesnittet, en asynkron (MqttAsyncClient) og en synkron (MqttClient).I vårt tilfelle vil vi fokusere på den synkrone versjonen, som har enklere semantikk.

Oppsettet i seg selv er en totrinnsprosess: vi oppretter først en forekomst av MqttClient klasse og deretter kobler vi den til serveren vår. Følgende underavsnitt beskriver disse trinnene.

4.1. Opprette et nytt IMqttClient Forekomst

Følgende kodebit viser hvordan du lager en ny IMqttClient synkron forekomst:

String publisherId = UUID.randomUUID (). ToString (); IMqttClient publisher = new MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

I dette tilfellet, vi bruker den enkleste konstruktøren tilgjengelig, som tar sluttpunktadressen til vår MQTT-megler og en klientidentifikator, som unikt identifiserer vår klient.

I vårt tilfelle brukte vi en tilfeldig UUID, så det genereres en ny klientidentifikator ved hver kjøring.

Paho tilbyr også flere konstruktører som vi kan bruke for å tilpasse utholdenhetsmekanismen som brukes til å lagre ukjente meldinger og / eller PlanlagtExecutorService brukes til å kjøre bakgrunnsoppgaver som kreves av protokollmotorimplementeringen.

Serverendepunktet vi bruker er en offentlig MQTT-megler som er vert for Paho-prosjektet, som lar alle med internettforbindelse teste klienter uten behov for godkjenning.

4.2. Koble til serveren

Vårt nyopprettede MqttClient forekomst er ikke koblet til serveren. Vi gjør det ved å kalle det koble() metode, eventuelt passerer en MqttConnectOptions forekomst som lar oss tilpasse noen aspekter av protokollen.

Spesielt kan vi bruke disse alternativene til å formidle tilleggsinformasjon som sikkerhetsopplysninger, øktgjenopprettingsmodus, tilkoblingsmodus og så videre.

De MqttConnectionOptions klasse eksponere disse alternativene som enkle egenskaper som vi kan sette ved bruk av vanlige settermetoder. Vi trenger bare å angi egenskapene som kreves for vårt scenario - de gjenværende antar standardverdier.

Koden som brukes til å opprette en forbindelse til serveren, ser vanligvis slik ut:

MqttConnectOptions-alternativer = nye MqttConnectOptions (); options.setAutomaticReconnect (true); options.setCleanSession (true); options.setConnectionTimeout (10); publisher.connect (opsjoner);

Her definerer vi våre tilkoblingsalternativer slik at:

  • Biblioteket vil automatisk prøve å koble til serveren igjen i tilfelle nettverksfeil
  • Det vil forkaste usendte meldinger fra en forrige kjøring
  • Tidsavbrudd for tilkobling er satt til 10 sekunder

5. Sende meldinger

Sende meldinger med en allerede tilkoblet MqttClient er veldig grei. Vi bruker en av publisere() metodevarianter for å sende nyttelasten, som alltid er et byteoppsett, til et gitt emne, ved å bruke ett av følgende tjenestekvalitetsalternativer:

  • 0 - "på det meste en gang" semantikk, også kjent som "fyr og glem". Bruk dette alternativet når tap av meldinger er akseptabelt, da det ikke krever noen form for bekreftelse eller utholdenhet
  • 1 - “minst en gang” semantikk. Bruk dette alternativet når tap av meldinger ikke er akseptabelt og abonnentene dine kan håndtere duplikater
  • 2 - “nøyaktig en gang” semantikk. Bruk dette alternativet når tap av meldinger ikke er akseptabelt og abonnentene dine kan ikke håndtere duplikater

I vårt prøveprosjekt, MotortemperaturSensor klasse spiller rollen som en mock-sensor som produserer en ny temperaturavlesning hver gang vi påkaller den anrop() metode.

Denne klassen implementerer Kan kalles grensesnitt slik at vi enkelt kan bruke den med en av ExecutorService implementeringer tilgjengelig i java.util.concurrent pakke:

offentlig klasse EngineTemperatureSensor implementerer Callable {// ... private medlemmer utelatt offentlig EngineTemperatureSensor (IMqttClient-klient) {this.client = client; } @ Override public Void call () kaster unntak {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (true); client.publish (TOPIC, msg); return null; } privat MqttMessage readEngineTemp () {dobbel temp = 80 + rnd.nextDouble () * 20.0; byte [] nyttelast = String.format ("T:% 04.2f", temp) .getBytes (); returner ny MqttMessage (nyttelast); }}

De MqttMessage innkapsler selve nyttelasten, den etterspurte servicekvaliteten og beholdt flagg for meldingen. Dette flagget indikerer til megleren at den skal beholde denne meldingen til den blir konsumert av en abonnent.

Vi kan bruke denne funksjonen til å implementere en "sist kjent god" oppførsel, så når en ny abonnent kobler seg til serveren, vil den motta den beholdte meldingen med en gang.

6. Motta meldinger

For å motta meldinger fra MQTT-megleren, vi trenger å bruke en av abonnere() metode varianter, som lar oss spesifisere:

  • Ett eller flere emnefiltre for meldinger vi ønsker å motta
  • Den tilhørende QoS
  • Tilbakeringingsbehandleren for å behandle mottatte meldinger

I det følgende eksemplet viser vi hvordan du legger til en meldingslytter til en eksisterende IMqttClient forekomst for å motta meldinger fra et gitt emne. Vi bruker en CountDownLatch som en synkroniseringsmekanisme mellom tilbakeringingen vår og hovedutførelsestråden, og reduserer den hver gang en ny melding kommer.

I eksempelkoden har vi brukt en annen IMqttClient forekomst for å motta meldinger. Vi gjorde det bare for å tydeliggjøre hvilken klient som gjør hva, men dette er ikke en Paho-begrensning. Hvis du vil, kan du bruke den samme klienten til å publisere og motta meldinger:

CountDownLatch receivedSignal = ny CountDownLatch (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] nyttelast = msg.getPayload (); // ... nyttelasthåndtering utelatt receivedSignal.countDown ();}); receivedSignal.await (1, TimeUnit.MINUTES);

De abonnere() varianten brukt ovenfor tar en IMqttMessageListener forekomst som sitt andre argument.

I vårt tilfelle bruker vi en enkel lambda-funksjon som behandler nyttelasten og reduserer en teller. Hvis det ikke kommer nok meldinger i det angitte tidsvinduet (1 minutt), vises avvente() metoden vil kaste et unntak.

Når du bruker Paho, trenger vi ikke å bekrefte mottak av meldinger eksplisitt. Hvis tilbakeringingen returnerer normalt, antar Paho at det er vellykket forbruk og sender en bekreftelse til serveren.

Hvis tilbakeringingen kaster et Unntak, blir klienten stengt. Vær oppmerksom på at dette vil føre til tap av meldinger sendt med QoS-nivå på 0.

Meldinger sendt med QoS nivå 1 eller 2 sendes på nytt av serveren når klienten er tilkoblet på nytt og abonnerer på emnet igjen.

7. Konklusjon

I denne artikkelen demonstrerte vi hvordan vi kan legge til støtte for MQTT-protokollen i Java-applikasjonene våre ved hjelp av biblioteket som tilbys av Eclipse Paho-prosjektet.

Dette biblioteket håndterer alle protokolldetaljer på lavt nivå, slik at vi kan fokusere på andre aspekter av løsningen, samtidig som vi gir god plass til å tilpasse viktige aspekter av de interne funksjonene, for eksempel vedvaring av meldinger.

Koden vist i denne artikkelen er tilgjengelig på GitHub.