import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Random; public class ProducerConsumerMain { // the number of producer and consumer threads private static final int THREADS = 1000; // the number of items each producer adds and each consumer gets from the queue private static final int EACH = 1000; // the capacity of the queue private static final int CAPACITY = 10; public static void main(String[] argv) { var queue = new ProducerConsumerQueue(CAPACITY); var threads = new ArrayList(2 * THREADS); var toProduce = new ArrayList>(THREADS); var Random = new Random(); var consumers = new ArrayList(); for (int i = 0; i < THREADS; i++) { var data = new ArrayList(EACH); for (int j = 0; j < EACH; j++) { data.add(Random.nextInt()); } toProduce.add(data); threads.add(new TestProducer(data, queue)); var consumer = new TestConsumer(EACH, queue); threads.add(consumer); consumers.add(consumer); } threads.forEach(Thread::start); try { for (var t : threads) { t.join(); } } catch (InterruptedException ignored) {} var produced = toProduce.stream().flatMap(Collection::stream).sorted().toList(); var consumed = consumers.stream().flatMap(c -> c.getElements().stream()).sorted().toList(); System.out.println(produced.equals(consumed)); } public static class TestProducer extends Thread { private final List elements; private final ProducerConsumerQueue queue; public TestProducer(List elements, ProducerConsumerQueue queue) { this.elements = elements; this.queue = queue; } @Override public void run() { try { for (var e : elements) { queue.add(e); Thread.yield(); } } catch (InterruptedException e) { throw new RuntimeException(e); } } } public static class TestConsumer extends Thread { private final List elements = new ArrayList<>(); private final int toConsume; private final ProducerConsumerQueue queue; public TestConsumer(int toConsume, ProducerConsumerQueue queue) { this.toConsume = toConsume; this.queue = queue; } public List getElements() { return elements; } @Override public void run() { try { for (int i = 0; i < toConsume; i++) { elements.addLast(queue.get()); Thread.yield(); } } catch (InterruptedException e) { throw new RuntimeException(e); } } } }