Zadanie

Zbuduj aplikację zawierającą dwa komponenty (poprzez komponenty rozumiemy klasy lub zestaw klas realizujących daną funkcjonalność):

  • TaskProducer
  • TaskConsumer

Obydwa komponenty są uruchamiane są na starcie aplikacji i współdzielą globalną kolejkę, do której TaskProducer wrzuca wygenerowane, losowe zadania (obiekt typu Task), aż się dana kolejka zapełni (tzn. osiągnie konfigurowalny maksymalny limit). Jak kolejka się zapełni to TaskProducer czeka, aż jej wielkość spadnie do połowy i ponownie uruchamia produkcję.

W tym samym czasie TaskConsumer pobiera z kolejki zadanie (o ile jakieś zadanie w kolejce jest, jak nie ma to okresowo próbkuje kolejkę czekając na zadania) i jeśli jest zadanie to je realizuje, a wynik zadania wypisuje na konsolę.

Samo zadanie jest bardzo proste: dodaj do siebie dwie liczby (parametry wejściowe danego zadania) i wypisz na konsolę wynik dodawania.
Dodatkowo, należy stworzyć co najmniej 2 TaskProducer(s) pracujące w osobnych wątkach oraz co najmniej 4 TaskConsumer(s), także pracujące w osobnych wątkach.

Rozwiązanie

Na początek spójrzmy na rysunek poglądowy. Naszym krytycznym zasobem jest kolejka. To właśnie ten element będzie monitorem. Czyli będziemy go monitorować i ograniczać dostęp do jednego wątku w danej chwili. Pomożemy to uniknąć nam sytuacji gdzie np. dwóch konsumentów zacznie wykonywać to samo zadanie.

Na rozgrzewkę stwórzmy klasę Task, która będzie odpowiedzialna za wykonanie zadania, czyli sumowanie dwóch, wcześniej podanych liczb.

Task.java
package pc.michaladamski.com;

public class Task {
    private int x;
    private int y;

    public Task(int x, int y) {
        this.x = x;
        this.y = y;
    }

    public int execute() {
        return x + y;
    }
}

Obiekty tej klasy będę tworzone przez TaskProducer, a TaskConsumer będzie wykonywał na nich metodę execute

Wersja 1

Na początek najprostsza działająca wersja. Jeśli kolejka nie osiągnęła swojej maksymalnej pojemności to cały czas tworzymy i dodajemy do niej nowe zadania. W przeciwnym razie budzimy pozostałe wątki, a bieżący wątek dodajemy do puli wątków oczekujących na monitor. Metoda wait zwalnia monitor, który może być zajęty przez inny z wątków. Na przykład jeden z przed chwilą wybudzonych

TaskProducer.java
package pc.michaladamski.com;

import java.util.Deque;
import java.util.Random;

public class TaskProducer implements Runnable {
    private static final int BOUND = 1000;
    private Deque queue;
    private Random random = new Random();

    public TaskProducer(Deque queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean interrupted = false;
        while (!interrupted) {
            synchronized (queue) {
                try {
                    if (queue.size() < Application.CAPACITY) {
                        Task task = new Task(random.nextInt(BOUND), random.nextInt(BOUND));
                        queue.offer(task);
                    } else {
                        queue.wait();
                        queue.notifyAll();
                    }
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        }
    }
}
TaskConsumer.java

Nasz konsument najpierw sprawdza czy w kolejce jest jakieś zadania. Jeśli nie to budzi inne wątki, a sam zwalnia monitor i czeka w puli na to, aż inny wątek go wybudzi.

package pc.michaladamski.com;

import java.util.Deque;

public class TaskConsumer implements Runnable {
    private Deque queue;

    public TaskConsumer(Deque queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean interrupted = false;
        while (!interrupted) {
            synchronized (queue) {
                try {
                    if (queue.isEmpty()) {
                        queue.notifyAll();
                        queue.wait();
                    } else {
                        System.out.println("Task execution result: " + queue.poll().execute());
                        if (queue.size() < Application.CAPACITY / 2) {
                            queue.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        }
    }
}

Pozostało nam stworzyć jeszcze klasę główną aplikacji. Klasa ta stworzy kolejkę, którą będą współdzielić wszystkie wątki. Następnie stworzy i wystartuje wszystkich producentów i konsumentów.

Application.java
package pc.michaladamski.com;

import java.util.ArrayDeque;
import java.util.Deque;

public class Application {
    public final static int CAPACITY = 20;
    public final static int PROD_COUNT = 120;
    public final static int CONS_COUNT = 4;

    public static void main(String[] args) {
        Deque queue = new ArrayDeque<>(CAPACITY);
        TaskProducer taskProducer = new TaskProducer(queue);
        TaskConsumer taskConsumer = new TaskConsumer(queue);
        runThreads(taskProducer, PROD_COUNT);
        runThreads(taskConsumer, CONS_COUNT);
    }

    public static void runThreads(Runnable runnable, int count) {
        for (int i = 0; i < count; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
}

Refaktoring

Na początek powinniśmy poprawić jedną dosyć istotną rzeczy w klasie TaskConsumer. Popatrz na tę klasę i zastanów się co można by tam zoptymalizować? Wyobraź sobie, że wykonanie naszego zadania trwa znacznie dłużej. Nie jest to sumowanie dwóch liczba, a np. duży raport do wygenerowania. W obecnym rozwiązaniu zadanie jest wykonywane w sekcji krytycznej kodu. Sekcja to powinna ograniczać się do minimum. Pobranie zadania z kolejki wymaga synchronizacji. Jednak jego wykonanie już nie. Dlatego powinniśmy uruchamiać zadanie poza blokiem krytycznym.

TaskConsumer.java
    @Override
    public void run() {
        boolean interrupted = false;
        while (!interrupted) {
            Optional task = Optional.empty();
            synchronized (queue) {
                try {
                    if (queue.isEmpty()) {
                        queue.notifyAll();
                        queue.wait();
                    } else {
                        task = Optional.of(queue.poll());
                        if (queue.size() < Application.CAPACITY / 2) {
                            queue.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
            task.ifPresent(t -> System.out.println("Task execution result: " + t.execute()));
        }
    }

Kod rozwiązania w wersji 1 znajdziesz w repozytorium github tutaj.

Wersja 2

Innym rozwiązaniem może być użycie kolekcji blokującej. W tym rozwiązaniu mechanizm synchronizacji jest już zaimplementowany bezpośrednio w kolekcji. W naszym przykładzie użyjemy java.util.concurrent.BlockingDeque

Application.java
package pc.michaladamski.com;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class Application {
    public final static int CAPACITY = 20;
    public final static int PROD_COUNT = 2;
    public final static int CONS_COUNT = 4;

    public static void main(String[] args) {
        BlockingDeque queue = new LinkedBlockingDeque<>(CAPACITY);
        TaskProducer taskProducer = new TaskProducer(queue);
        TaskConsumer taskConsumer = new TaskConsumer(queue);
        runThreads(taskProducer, PROD_COUNT);
        runThreads(taskConsumer, CONS_COUNT);
    }

    public static void runThreads(Runnable runnable, int count) {
        for (int i = 0; i < count; i++) {
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
}

W tym rozwiązaniu synchronizacja jest zawarta już bezpośrednio w kolejce. Także możemy już usunąć synchronizację z naszych klas producenta i konsumenta.

TaskProducer.java
package pc.michaladamski.com;
import java.util.Random;
import java.util.concurrent.BlockingDeque;

import static pc.michaladamski.com.Application.CAPACITY;

public class TaskProducer implements Runnable {
    private static final int BOUND = 1000;
    private BlockingDeque queue;
    private Random random = new Random();
    private volatile boolean produce = true;

    public TaskProducer(BlockingDeque queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean interrupted = false;
        while (!interrupted) {
            Task task = new Task(random.nextInt(BOUND), random.nextInt(BOUND));
            try {
                if (produce) {
                    queue.put(task);
                    if (queue.remainingCapacity() == 0) {
                        produce = false;
                    }
                } else {
                    Thread.sleep(100);
                    produce = queue.remainingCapacity() >= CAPACITY / 2;
                }
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    }
}
TaskConsumer.java
package pc.michaladamski.com;

import java.util.concurrent.BlockingDeque;

public class TaskConsumer implements Runnable {
    private BlockingDeque queue;

    public TaskConsumer(BlockingDeque queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean interrupted = false;
        while (!interrupted) {
            Task task = null;
            try {
                task = queue.take();
                Thread.sleep(100);
            } catch (InterruptedException e) {
                interrupted = true;
            }
            System.out.println("Task execution result: " + task.execute());
        }
    }
}

Kod rozwiązania w wersji 2 znajdziesz w repozytorium github tutaj.

Zadanie: producent-konsument
Tagi:                        

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *