#include "xpress.h"
#ifdef ENABLE_MULTITHREADED_COMPRESSION
-#include <semaphore.h>
#include <pthread.h>
#endif
#ifdef ENABLE_MULTITHREADED_COMPRESSION
struct shared_queue {
- sem_t filled_slots;
- sem_t empty_slots;
- pthread_mutex_t lock;
+ unsigned size;
unsigned front;
unsigned back;
+ unsigned filled_slots;
void **array;
- unsigned size;
+ pthread_mutex_t lock;
+ pthread_cond_t msg_avail_cond;
+ pthread_cond_t space_avail_cond;
};
static int shared_queue_init(struct shared_queue *q, unsigned size)
q->array = CALLOC(sizeof(q->array[0]), size);
if (!q->array)
return WIMLIB_ERR_NOMEM;
-
- sem_init(&q->filled_slots, 0, 0);
- sem_init(&q->empty_slots, 0, size);
- pthread_mutex_init(&q->lock, NULL);
+ q->filled_slots = 0;
q->front = 0;
q->back = size - 1;
q->size = size;
+ pthread_mutex_init(&q->lock, NULL);
+ pthread_cond_init(&q->msg_avail_cond, NULL);
+ pthread_cond_init(&q->space_avail_cond, NULL);
return 0;
}
static void shared_queue_destroy(struct shared_queue *q)
{
- sem_destroy(&q->filled_slots);
- sem_destroy(&q->empty_slots);
- pthread_mutex_destroy(&q->lock);
FREE(q->array);
+ pthread_mutex_destroy(&q->lock);
+ pthread_cond_destroy(&q->msg_avail_cond);
+ pthread_cond_destroy(&q->space_avail_cond);
}
static void shared_queue_put(struct shared_queue *q, void *obj)
{
- sem_wait(&q->empty_slots);
pthread_mutex_lock(&q->lock);
+ while (q->filled_slots == q->size)
+ pthread_cond_wait(&q->space_avail_cond, &q->lock);
q->back = (q->back + 1) % q->size;
q->array[q->back] = obj;
+ q->filled_slots++;
- sem_post(&q->filled_slots);
+ pthread_cond_broadcast(&q->msg_avail_cond);
pthread_mutex_unlock(&q->lock);
}
static void *shared_queue_get(struct shared_queue *q)
{
- sem_wait(&q->filled_slots);
+ void *obj;
+
pthread_mutex_lock(&q->lock);
+ while (q->filled_slots == 0)
+ pthread_cond_wait(&q->msg_avail_cond, &q->lock);
- void *obj = q->array[q->front];
+ obj = q->array[q->front];
q->array[q->front] = NULL;
q->front = (q->front + 1) % q->size;
+ q->filled_slots--;
- sem_post(&q->empty_slots);
+ pthread_cond_broadcast(&q->space_avail_cond);
pthread_mutex_unlock(&q->lock);
return obj;
}