Guide til DelayQueue

1. Oversikt

I denne artikkelen ser vi på DelayQueue konstruere fra java.util.concurrent pakke. Dette er en blokkeringskø som kan brukes i produsent-forbrukerprogrammer.

Den har en veldig nyttig egenskap - når forbrukeren ønsker å ta et element fra køen, kan de bare ta det når forsinkelsen for det aktuelle elementet er utløpt.

2. Implementering Forsinket for elementer i DelayQueue

Hvert element vi ønsker å legge inn i DelayQueue trenger å implementere Forsinket grensesnitt. La oss si at vi ønsker å lage en DelayObject klasse. Forekomster av den klassen vil bli lagt inn i DelayQueue.

Vi passerer String data og delayInMilliseconds som og argumenter til sin konstruktør:

offentlig klasse DelayObject implementerer forsinket {private strengdata; privat lang starttid; public DelayObject (String data, long delayInMilliseconds) {this.data = data; this.startTime = System.currentTimeMillis () + delayInMilliseconds; }

Vi definerer en starttid - dette er en tid da elementet skal konsumeres fra køen. Deretter må vi implementere getDelay () metode - den skal returnere gjenværende forsinkelse assosiert med dette objektet i den gitte tidsenheten.

Derfor må vi bruke TimeUnit.convert () metode for å returnere den gjenværende forsinkelsen i riktig TimeUnit:

@Override public long getDelay (TimeUnit unit) {long diff = startTime - System.currentTimeMillis (); return unit.convert (diff, TimeUnit.MILLISECONDS); }

Når forbrukeren prøver å ta et element fra køen, blir DelayQueue vil utføre getDelay () for å finne ut om elementet kan returneres fra køen. Hvis den getDelay () metoden vil returnere null eller et negativt tall, betyr det at det kan hentes fra køen.

Vi må også implementere sammenligne med() metoden, fordi elementene i DelayQueue vil bli sortert etter utløpstiden. Gjenstanden som utløper først holdes i køen og elementet med høyest utløpstid holdes i halen av køen:

@ Override public int compareTo (Delayed o) {return Ints.saturatedCast (this.startTime - ((DelayObject) o). StartTime); }

3. DelayQueue Consumer og produsent

For å kunne teste vår DelayQueue vi trenger å implementere produsent- og forbrukerlogikk. Produsentklassen tar køen, antall elementer som skal produseres, og forsinkelsen av hver melding i millisekunder som argumenter.

Så når løpe() metoden blir påkalt, den setter elementer i køen, og sover i 500 millisekunder etter hvert sett:

offentlig klasse DelayQueueProducer implementerer Runnable {private BlockingQueue kø; private Integer numberOfElementsToProduc; private Integer delayOfEachProducedMessageMilliseconds; // standardkonstruktør @ Override public void run () {for (int i = 0; i <numberOfElementsToProduce; i ++) {DelayObject object = new DelayObject (UUID.randomUUID (). toString (), delayOfEachProducedMessageMilliseconds); System.out.println ("Sett objekt:" + objekt); prøv {queue.put (objekt); Tråd. Søvn (500); } fange (InterruptedException ie) {ie.printStackTrace (); }}}}

Forbrukerimplementeringen er veldig lik, men det holder også oversikt over antall meldinger som ble konsumert:

offentlig klasse DelayQueueConsumer implementerer Runnable {private BlockingQueue kø; private Integer numberOfElementsToTake; offentlig AtomicInteger numberOfConsumedElements = nytt AtomicInteger (); // standardkonstruktører @ Override public void run () {for (int i = 0; i <numberOfElementsToTake; i ++) {prøv {DelayObject object = queue.take (); numberOfConsumedElements.incrementAndGet (); System.out.println ("Forbrukeropptak:" + objekt); } fange (InterruptedException e) {e.printStackTrace (); }}}}

4. DelayQueue Brukstest

For å teste atferden til DelayQueue, vi lager en produsenttråd og en forbrukertråd.

Produsenten vil sette() to objekter på køen med 500 millisekunder forsinkelse. Testen hevder at forbrukeren konsumerte to meldinger:

@Test offentlig ugyldighet givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay () kaster InterruptedException {// gitt ExecutorService executor = Executors.newFixedThreadPool (2); BlockingQueue queue = new DelayQueue (); int numberOfElementsToProduc = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer forbruker = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer produsent = ny DelayQueueProducer (kø, numberOfElementsToProducer, delayOfEachProducedMessageMilliseconds); // når executor.submit (produsent); executor.submit (forbruker); // deretter executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), numberOfElementsToProduce); }

Vi kan se at å kjøre dette programmet vil gi følgende resultater:

Sett objekt: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 1494069868007} Forbrukeropptak: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 14940668 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512} Forbrukeropptak: {data = 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512}

Produsenten setter gjenstanden, og etter en stund forbrukes det første objektet som forsinkelsen har utløpt for.

Den samme situasjonen oppstod for det andre elementet.

5. Forbruker ikke i stand til å konsumere på gitt tid

La oss si at vi har en produsent som produserer et element som vil utløper om 10 sekunder:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer forbruker = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer produsent = ny DelayQueueProducer (kø, numberOfElementsToProducer, forsinkelseOfEachProducedMessageMilliseconds);

Vi starter testen, men den avsluttes etter 5 sekunder. På grunn av egenskapene til DelayQueue, forbrukeren vil ikke kunne konsumere meldingen fra køen fordi elementet ikke har utløpt ennå:

executor.submit (produsent); executor.submit (forbruker); executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 0);

Merk at forbrukerens numberOfConsumedElements har en verdi lik null.

6. Å produsere et element med umiddelbar utløp

Når implementeringene av Forsinket beskjed getDelay () metoden returnerer et negativt tall, det betyr at det gitte elementet allerede er utløpt. I denne situasjonen vil produsenten konsumere det elementet umiddelbart.

Vi kan teste situasjonen for å produsere et element med negativ forsinkelse:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer forbruker = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer produsent = ny DelayQueueProducer (kø, numberOfElementsToProducer, forsinkelseOfEachProducedMessageMilliseconds);

Når vi starter testsaken, vil forbrukeren forbruke elementet umiddelbart fordi det allerede er utløpt:

executor.submit (produsent); executor.submit (forbruker); executor.awaitTermination (1, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 1);

7. Konklusjon

I denne artikkelen så vi på DelayQueue konstruere fra java.util.concurrent pakke.

Vi implementerte en Forsinket element som ble produsert og konsumert fra køen.

Vi utnyttet implementeringen av DelayQueue å konsumere elementer som var utløpt.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - som er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found