En guide til Java SynchronousQueue

1. Oversikt

I denne artikkelen ser vi på SynchronousQueue fra java.util.concurrent pakke.

Enkelt sagt, denne implementeringen lar oss utveksle informasjon mellom tråder på en trådsikker måte.

2. API-oversikt

De SynchronousQueue bare har to operasjoner som støttes: ta() og sette(), og begge blokkerer.

Når vi for eksempel vil legge til et element i køen, må vi ringe sette() metode. Denne metoden vil blokkere til en annen tråd kaller ta() metode, signaliserer at den er klar til å ta et element.

Selv om SynchronousQueue har et grensesnitt for en kø, bør vi tenke på det som et utvekslingspunkt for et enkelt element mellom to tråder, der den ene tråden avgir et element, og en annen tråd tar det elementet.

3. Implementere utleveringer ved hjelp av en delt variabel

For å se hvorfor SynchronousQueue kan være så nyttig, vi implementerer en logikk ved hjelp av en delt variabel mellom to tråder, og neste gang vil vi skrive om logikken ved hjelp av SynchronousQueue gjør koden vår mye enklere og mer lesbar.

La oss si at vi har to tråder - en produsent og en forbruker - og når produsenten setter en verdi av en delt variabel, vil vi signalisere det faktum til forbrukertråden. Deretter vil forbrukertråden hente en verdi fra en delt variabel.

Vi vil bruke CountDownLatch å koordinere disse to trådene, for å forhindre en situasjon når forbrukeren får tilgang til en verdi av en delt variabel som ikke var satt ennå.

Vi vil definere en delt stat variabel og a CountDownLatch som vil bli brukt til å koordinere behandlingen:

ExecutorService executor = Executors.newFixedThreadPool (2); AtomicInteger sharedState = nytt AtomicInteger (); CountDownLatch countDownLatch = ny CountDownLatch (1);

Produsenten vil lagre et tilfeldig heltall i delt stat variabel, og utfør countDown () metoden på countDownLatch, signaliserer til forbrukeren at den kan hente en verdi fra delt stat:

Kjørbar produsent = () -> {Heltall produsertElement = ThreadLocalRandom .current () .nextInt (); sharedState.set (produsertElement); countDownLatch.countDown (); };

Forbrukeren vil vente på countDownLatch bruker avvente() metode. Når produsenten signaliserer at variabelen ble satt, vil forbrukeren hente den fra delt stat:

Kjørbar forbruker = () -> {prøv {countDownLatch.await (); Heltall consumedElement = sharedState.get (); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Sist men ikke minst, la oss starte programmet vårt:

executor.execute (produsent); executor.execute (forbruker); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (countDownLatch.getCount (), 0);

Det vil gi følgende utdata:

Å lagre et element: -1507375353 til utvekslingspunktet forbrukte et element: -1507375353 fra utvekslingspunktet

Vi kan se at dette er mye kode for å implementere en så enkel funksjonalitet som å bytte et element mellom to tråder. I neste avsnitt vil vi prøve å gjøre det bedre.

4. Implementere utleveringer ved hjelp av SynchronousQueue

La oss nå implementere den samme funksjonaliteten som i forrige avsnitt, men med en SynchronousQueue. Det har en dobbel effekt fordi vi kan bruke den til å utveksle tilstand mellom tråder og for å koordinere den handlingen slik at vi ikke trenger å bruke noe annet enn SynchronousQueue.

For det første vil vi definere en kø:

ExecutorService executor = Executors.newFixedThreadPool (2); SynchronousQue kø = ny SynchronousQueue ();

Produsenten vil kalle en sette() metode som vil blokkere til en annen tråd tar et element fra køen:

Kjørbar produsent = () -> {Heltall produsertElement = ThreadLocalRandom .current () .nextInt (); prøv {queue.put (producedElement); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Forbrukeren vil ganske enkelt hente elementet ved hjelp av ta() metode:

Kjørbar forbruker = () -> {prøv {Integer consumedElement = queue.take (); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Deretter starter vi programmet:

executor.execute (produsent); executor.execute (forbruker); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (queue.size (), 0);

Det vil gi følgende utdata:

Å lagre et element: 339626897 til utvekslingspunktet forbrukte et element: 339626897 fra utvekslingspunktet

Vi kan se at a SynchronousQueue brukes som et utvekslingspunkt mellom trådene, noe som er mye bedre og mer forståelig enn forrige eksempel som brukte den delte tilstanden sammen med en CountDownLatch.

5. Konklusjon

I denne raske opplæringen så vi på SynchronousQueue konstruere. Vi opprettet et program som utveksler data mellom to tråder ved hjelp av delt tilstand, og omskrev deretter programmet for å utnytte SynchronousQueue konstruere. Dette fungerer som et utvekslingspunkt som koordinerer produsenten og forbrukertråden.

Implementeringen av alle disse eksemplene og kodebitene finnes i GitHub-prosjektet - dette er et Maven-prosjekt, så det skal være enkelt å importere og kjøre som det er.


$config[zx-auto] not found$config[zx-overlay] not found