// !> Thread-safe queue with independent head and tail #include #include #include #include #include typedef struct node { int value; struct node *next; } node_t; typedef struct queue { node_t *head; node_t *tail; pthread_mutex_t head_lock; pthread_mutex_t tail_lock; } queue_t; queue_t *q; // make the queue shared among all threads queue_t *queue_new() { queue_t *q = malloc(sizeof(queue_t)); assert(q != NULL); // insert a dummy head node node_t *tmp = malloc(sizeof(node_t)); assert(tmp != NULL); tmp->next = NULL; assert(0 == pthread_mutex_init(&q->head_lock, NULL)); assert(0 == pthread_mutex_init(&q->tail_lock, NULL)); q->head = q->tail = tmp; return q; } int queue_empty(queue_t *q) { assert(q != NULL); return q->head->next == NULL; } /** * Thread-safe enqueue. * * Insert the given value to the back of the queue. */ void queue_enqueue(queue_t *q, int value) { assert(q != NULL); node_t *tmp = malloc(sizeof(node_t)); assert(tmp != NULL); tmp->value = value; tmp->next = NULL; assert(0 == pthread_mutex_lock(&q->tail_lock)); q->tail->next = tmp; q->tail = tmp; assert(0 == pthread_mutex_unlock(&q->tail_lock)); } /** * Thread safe dequeue. * * Writes the dequeued value to the address given as the second argument and * returns 0 on success, or -1 if the queue was empty. */ int queue_dequeue(queue_t *q, int *value) { assert(q != NULL); assert(0 == pthread_mutex_lock(&q->head_lock)); assert(q->head != NULL); node_t *tmp = q->head; node_t *new_head = tmp->next; if (new_head == NULL) { assert(0 == pthread_mutex_unlock(&q->head_lock)); return -1; // queue was empty } *value = new_head->value; q->head = new_head; free(tmp); assert(0 == pthread_mutex_unlock(&q->head_lock)); return 0; } // TODO: void queue_free(queue_t * q) { ... } void *reader_thread(void *arg) { int me = *((int *) arg); int val; sleep(2); while (queue_dequeue(q, &val) == 0) { printf("%d: %d\n", me, val); } return NULL; } void *writer_thread(void *arg) { int from = *((int *) arg); sleep(1); for (int i = from; i < from + 10; i++) { queue_enqueue(q, i); } return NULL; } int main(int argc, char **argv) { q = queue_new(); pthread_t threads[4]; // start two writer threads int from1 = 0; assert(pthread_create(&threads[0], NULL, writer_thread, &from1) == 0); int from2 = 100; assert(pthread_create(&threads[2], NULL, writer_thread, &from2) == 0); // start two reader threads int reader1 = 1; assert(pthread_create(&threads[1], NULL, reader_thread, &reader1) == 0); int reader2 = 2; assert(pthread_create(&threads[3], NULL, reader_thread, &reader2) == 0); // wait for all threads to finish for (int i = 0; i < 4; i++) { assert(pthread_join(threads[i], NULL) == 0); } // TODO: queue_free(q); return 0; }