// To compile: gcc -g3 -pthread THIS_FILE #define _DEFAULT_SOURCE /* for drand48 */ #include #include #include #include #include #include #define MaxItems 5 #define BufferSize 5 // circularBuf implements a circular buffer data structure. // This is a slightly more efficient version o keeping a // linked list for the valid tasks, and a linked list for // the empty task slots, where the total number of tasks // is BufferSize. // The integers 'in' and 'out' are indexes into the cirucular buffer. // The tasks from 'in' to 'out' are the valid tasks, and the // tasks from 'out' to 'in' are the empty task slots. // The semaphor counts of the semaphres empty and full should always // add to 'BufferSize', the total number of task slots, // when the code is in equilibrium (when not in the critical section // of the mutex). sem_t empty; sem_t full; int in = 0; int out = 0; int circularBuf[BufferSize]; pthread_mutex_t mutex; int insert_producer_item(int item) { int index_in = in; // Capture index_in now, inside critical section circularBuf[in] = item; in = (in+1) % BufferSize; return index_in; } int extract_producer_item(int *item) { // item is an OUt variable int index_out = out; // Capture index_out now, inside critical section *item = circularBuf[out]; out = (out+1)%BufferSize; return index_out; } void *producer(void *pno) { int item; for (int i = 0; i < MaxItems; i++) { int len = (int) ((drand48() * 5) + 1); sleep(len); item = rand(); // Produce a random item sem_wait(&empty); pthread_mutex_lock(&mutex); int index_in = insert_producer_item(item); pthread_mutex_unlock(&mutex); sem_post(&full); printf("Producer %d: Insert item %d at index %d\n", *((int *)pno), item, index_in); } return NULL; } void *consumer(void *cno) { for (int i = 0; i < MaxItems; i++) { int len = (int) ((drand48() * 5) + 1); sleep(len); sem_wait(&full); pthread_mutex_lock(&mutex); int item; int index_out = extract_producer_item(&item); // item is an OUT variable pthread_mutex_unlock(&mutex); sem_post(&empty); printf("Consumer %d: Remove item %d from index %d\n", *((int *)cno), item, index_out); } return NULL; } int main(int argc, char* argv[]) { if (argc < 3) { printf("Usage: %s \n", argv[0]); return 1; } int NUM_PRODUCERS = atoi(argv[1]); int NUM_CONSUMERS = atoi(argv[2]); if (NUM_PRODUCERS != NUM_CONSUMERS) { printf("NUM_PRODUCERS != NUM_CONSUMERS; NOT YET HANDLED BY THIS CODE!\n"); return 2; } pthread_t pthread_producer_array[NUM_PRODUCERS]; pthread_t pthread_consumer_array[NUM_CONSUMERS]; pthread_mutex_init(&mutex, NULL); sem_init(&empty, 0, BufferSize); sem_init(&full, 0, 0); int thread_num_int[NUM_PRODUCERS > NUM_CONSUMERS ? NUM_PRODUCERS : NUM_CONSUMERS]; for (int i = 0; i < NUM_PRODUCERS; i++) { thread_num_int[i] = i+1; pthread_create(&pthread_producer_array[i], NULL, producer, (void *)&thread_num_int[i]); } for (int i = 0; i < NUM_CONSUMERS; i++) { thread_num_int[i] = i+1; pthread_create(&pthread_consumer_array[i], NULL, consumer, (void *)&thread_num_int[i]); } for (int i = 0; i < NUM_PRODUCERS; i++) { pthread_join(pthread_producer_array[i], NULL); } for (int i = 0; i < NUM_CONSUMERS; i++) { pthread_join(pthread_consumer_array[i], NULL); } return 0; }