//------------------------------------------------------------------------ // File Name: ProducerConsumer.java // Description: A handy pattern that illustrates producer/consumer. // // This file demonstrates the producer/consumer pattern in a multithreaded // application. The number of producers and consumer may be set by // changing the constants shown below. // // Modification History // Date Developer Defect Tag Description // ------ --------- ------- ----------- ---------------------------------- // 020997 Dan Becker Created. //------------------------------------------------------------------------ import java.io.IOException; import java.util.Vector; public class ProducerConsumer { final static int MAX_CONSUMERS = 5; final static int MAX_PRODUCERS = 4; final static int MAX_PRODUCTION = 1000; public static void main(String args[]) { System.out.println( "Synchronized Producer/Consumer Example" ); System.out.println( "Producer count: " + MAX_PRODUCERS + ", Producer production: " + MAX_PRODUCTION + ", Consumer count: " + MAX_CONSUMERS + "." ); System.out.flush(); // Create the single shared resource. SharedResource sharedResource = new SharedResource(); // Create a group of consumers. Vector consumerList = new Vector(); for ( int i = 0; i < MAX_CONSUMERS; i++ ) { Consumer consumer = new Consumer( sharedResource, "C" + i ); consumer.setDaemon( true ); // ensure that when main exits, the thread dies. consumer.start(); // start greedy consumption consumerList.addElement( consumer ); } // Create a group of producers. Vector producerList = new Vector(); for ( int i = 0; i < MAX_PRODUCERS; i++ ) { Producer producer = new Producer( sharedResource, "P" + i, MAX_PRODUCTION ); producer.setDaemon( true ); // ensure that when main exits, the thread dies. producer.start(); // start production producerList.addElement( producer ); } // Wait for all producers to end. try { for ( int i = 0; i < MAX_PRODUCERS; i++ ) ((Producer) producerList.elementAt( i )).join(); } catch (InterruptedException e) { } // Kill off all producers for ( int i = 0; i < MAX_PRODUCERS; i++ ) ((Producer)producerList.elementAt( i )).stop(); // Kill off all consumers for ( int i = 0; i < MAX_CONSUMERS; i++ ) ((Consumer)consumerList.elementAt( i )).stop(); // Print final results for all consumers. int total = 0; for ( int i = 0; i < MAX_CONSUMERS; i++ ) { System.out.println( "Consumer " + ((Consumer)consumerList.elementAt( i )).getConsName() + " consumed " + ((Consumer)consumerList.elementAt( i )).getConsumption() + " objects." ); total += ((Consumer)consumerList.elementAt( i )).getConsumption(); } System.out.println( "Consumption total: " + total + " items." ); } } class Producer extends Thread { // Constructor public Producer( SharedResource s, String name, int productionMax ) { this.sharedResource = s; this.producerName = name; this.productionMax = productionMax; } public void run() { for (int i = 0; i < productionMax; i++) { sharedResource.put( producerName, new String( producerName + i ) ); // try { sleep((int)(Math.random() * 50)); } catch (InterruptedException e) { } } } // Data private SharedResource sharedResource; private String producerName; private int productionMax; } class Consumer extends Thread { // Constructor public Consumer( SharedResource s, String name ) { sharedResource = s; consumerName = name; consumption = 0; } public void run() { Object value = null; while( true ) { // Get until stopped value = sharedResource.get( consumerName ); consumption++; } } public int getConsumption() { return consumption; } public String getConsName() { return consumerName; } // Data private SharedResource sharedResource; private String consumerName; private int consumption; } class SharedResource { public synchronized Object get( String who ) { // Don't do the wait if we can do the get if ( isAvailable == false ) { // This wait causes alternation between gets and puts. do { // System.out.println( "Consumer " + who + " is waiting for get." ); try { wait(); } catch (InterruptedException e) { } // Check after wait in case another consumer releases us. } while ( isAvailable == false ); } // System.out.println( "Consumer " + who + " does a get of " + contents + "." ); isAvailable = false; notifyAll(); // notify scheduler to pick another thread for running return contents; } public synchronized void put( String who, Object value ) { // Don't do the wait if we can put. if ( isAvailable == true ) { // This wait causes alternation between gets and puts. do { // System.out.println( "Producer " + who + " is waiting for put." ); try { wait(); } catch (InterruptedException e) { } // Check after wait in case another producer releases us. } while ( isAvailable == true ); } // System.out.println( "Producer " + who + " does a put of " + value + "." ); contents = value; isAvailable = true; notifyAll(); // notify scheduler to pick another thread for running } private Object contents; private boolean isAvailable = false; }