// To compile: gcc -g3 -pthread THIS_FILE // SEE: https://en.wikipedia.org/wiki/Thread_pool (with diagram) // ISSUE: If there can be an unlimited backlog of tasks. // So, during enqueue of tasks, we need to see if // there are too many tasks, and then remove the next task // (refuse to enqueue it: "PLEASE TRY AGAIN LATER") // producer-consumer solves the problem of being forced to // remove a task, but it does so by blocking a producer that // tries to enqueue. #define _DEFAULT_SOURCE /* for drand48 */ #include #include #include #include #include #include #define MaxItems 5 sem_t tasks_in_queue; pthread_mutex_t mutex; struct Node { int data; struct Node* next; }; struct LinkedList { struct Node* head; struct Node* tail; }; struct Node initial_node_array[1000]; struct LinkedList work_list; struct LinkedList free_list; // Function to push a new element at the end (tail) of the list void push(struct LinkedList* list, struct Node *new_node) { // Put data into the new node new_node->next = NULL; // New node will be the last node, so its next is NULL // If the list is empty, make both head and tail point to the new node if (list->head == NULL) { list->head = list->tail = new_node; } else { // Otherwise, add the new node at the end and update the tail list->tail->next = new_node; list->tail = new_node; } } // Function to pop the front element of the list struct Node * pop(struct LinkedList* list) { // If the list is empty if (list->head == NULL) { printf("Internal error: Stack underflow. List is empty.\n"); abort(); } // Store the current head and move the head to the next node struct Node* popped_data = list->head; list->head = list->head->next; // If the list becomes empty, set the tail to NULL as well if (list->head == NULL) { list->tail = NULL; } return popped_data; } void initialize_list(struct LinkedList* list, int list_length) { for (int i = 0; i < list_length; i++) { push(list, &initial_node_array[i]); } } void insert_producer_item(int item) { pthread_mutex_lock(&mutex); struct Node *node_item = pop(&free_list); node_item->data = item; push(&work_list, node_item); pthread_mutex_unlock(&mutex); } int extract_producer_item() { pthread_mutex_lock(&mutex); struct Node *node_item = pop(&work_list); int item = node_item ->data; push(&free_list, node_item); pthread_mutex_unlock(&mutex); return item; } void *producer(void *pno) { int item; for (int i = 0; i < MaxItems; i++) { int len = (int) ((drand48() * 5) + 1); sleep(len); item = rand(); // Produce a random item while (free_list.head == NULL) { printf("No more task slots for producer ^d to insert item %d\n" "We will sleep for 1 second and then try again.\n", *((int *)pno), item); sleep(1); } insert_producer_item(item); sem_post(&tasks_in_queue); printf("Producer %d: Inserted item %d\n", *((int *)pno), item); } return NULL; } void *threadpool_worker(void *cno) { for (int i = 0; i < MaxItems; i++) { sem_wait(&tasks_in_queue); // Wait if tasks_in_queue is 0 int item = extract_producer_item(&item); // item is an OUT variable int len = (int) ((drand48() * 5) + 1); sleep(len); printf("Consumer %d: Consumed item %d\n", *((int *)cno), item); } return NULL; } int main(int argc, char* argv[]) { if (argc < 3) { printf("Usage: %s \n", argv[0]); return 1; } int NUM_PRODUCERS = atoi(argv[1]); int NUM_THREADS_IN_POOL = atoi(argv[2]); initialize_list(&work_list, 0); // Each thread in the pool is allocated a 'struct Node'. // When the 'struct Node' is in the 'work_list', then // the corresponding thread will do the work. initialize_list(&free_list, NUM_THREADS_IN_POOL); pthread_t pthread_producer_array[NUM_PRODUCERS]; pthread_t pthread_threadpool_worker_array[NUM_THREADS_IN_POOL]; pthread_mutex_init(&mutex, NULL); sem_init(&tasks_in_queue, 0, 0); int thread_num_int[NUM_PRODUCERS > NUM_THREADS_IN_POOL ? NUM_PRODUCERS : NUM_THREADS_IN_POOL]; for (int i = 0; i < NUM_PRODUCERS; i++) { thread_num_int[i] = i+1; pthread_create(&pthread_producer_array[i], NULL, producer, (void *)&thread_num_int[i]); } for (int i = 0; i < NUM_THREADS_IN_POOL; i++) { thread_num_int[i] = i+1; pthread_create(&pthread_threadpool_worker_array[i], NULL, threadpool_worker, (void *)&thread_num_int[i]); } for (int i = 0; i < NUM_PRODUCERS; i++) { pthread_join(pthread_producer_array[i], NULL); } for (int i = 0; i < NUM_THREADS_IN_POOL; i++) { pthread_join(pthread_threadpool_worker_array[i], NULL); } return 0; }