Lecture Videos
  1  import java.util.concurrent.*;
  2  import java.util.concurrent.locks.*;
  3  
  4  public class ConsumerProducer {
  5    private static Buffer buffer = new Buffer();
  6  
  7    public static void main(String[] args) {
  8      // Create a thread pool with two threads
  9      ExecutorService executor = Executors.newFixedThreadPool(2);
 10      executor.execute(new ProducerTask());
 11      executor.execute(new ConsumerTask());
 12      executor.shutdown();
 13    }
 14  
 15    // A task for adding an int to the buffer
 16    private static class ProducerTask implements Runnable {
 17      public void run() {
 18        try {
 19          int i = 1;
 20          while (true) {
 21            System.out.println("Producer writes " + i);
 22            buffer.write(i++); // Add a value to the buffer
 23            // Put the thread into sleep
 24            Thread.sleep((int)(Math.random() * 10000));
 25          }
 26        } 
 27        catch (InterruptedException ex) {
 28          ex.printStackTrace();
 29        }
 30      }
 31    }
 32  
 33    // A task for reading and deleting an int from the buffer
 34    private static class ConsumerTask implements Runnable {
 35      public void run() {
 36        try {
 37          while (true) {
 38            System.out.println("\t\t\tConsumer reads " + buffer.read());
 39            // Put the thread into sleep
 40            Thread.sleep((int)(Math.random() * 10000));
 41          }
 42        } 
 43        catch (InterruptedException ex) {
 44          ex.printStackTrace();
 45        }
 46      }
 47    }
 48  
 49    // An inner class for buffer
 50    private static class Buffer {
 51      private static final int CAPACITY = 1; // buffer size
 52      private java.util.LinkedList<Integer> queue =
 53        new java.util.LinkedList<>();
 54  
 55      // Create a new lock
 56      private static Lock lock = new ReentrantLock();
 57  
 58      // Create two conditions
 59      private static Condition notEmpty = lock.newCondition();
 60      private static Condition notFull = lock.newCondition();
 61  
 62      public void write(int value) {
 63        lock.lock(); // Acquire the lock
 64        try {
 65          while (queue.size() == CAPACITY) {
 66            System.out.println("Wait for notFull condition");
 67            notFull.await();
 68          }
 69  
 70          queue.offer(value);
 71          notEmpty.signal(); // Signal notEmpty condition
 72        } 
 73        catch (InterruptedException ex) {
 74          ex.printStackTrace();
 75        } 
 76        finally {
 77          lock.unlock(); // Release the lock
 78        }
 79      }
 80  
 81      public int read() {
 82        int value = 0;
 83        lock.lock(); // Acquire the lock
 84        try {
 85          while (queue.isEmpty()) {
 86            System.out.println("\t\t\tWait for notEmpty condition");
 87            notEmpty.await();
 88          }
 89  
 90          value = queue.remove();
 91          notFull.signal(); // Signal notFull condition
 92        } 
 93        catch (InterruptedException ex) {
 94          ex.printStackTrace();
 95        } 
 96        finally {
 97          lock.unlock(); // Release the lock
 98          return value;
 99        }
100      }
101    }
102  }