X-Git-Url: https://wimlib.net/git/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcompress_parallel.c;h=159b9ce54ab24e62ac08915bc9815d8956521900;hb=6979188381ba0bd7519afdd264e8d028326ab7dd;hp=4c2959068e989aa286e008c8a8e4cf17e3170c03;hpb=4e76a0562a165a38960b51c8df2180f84885065b;p=wimlib diff --git a/src/compress_parallel.c b/src/compress_parallel.c index 4c295906..159b9ce5 100644 --- a/src/compress_parallel.c +++ b/src/compress_parallel.c @@ -26,7 +26,6 @@ #endif #include -#include #include #include @@ -34,18 +33,19 @@ #include "wimlib/chunk_compressor.h" #include "wimlib/error.h" #include "wimlib/list.h" +#include "wimlib/threads.h" #include "wimlib/util.h" struct message_queue { struct list_head list; - pthread_mutex_t lock; - pthread_cond_t msg_avail_cond; - pthread_cond_t space_avail_cond; + struct mutex lock; + struct condvar msg_avail_cond; + struct condvar space_avail_cond; bool terminating; }; struct compressor_thread_data { - pthread_t thread; + struct thread thread; struct message_queue *chunks_to_compress_queue; struct message_queue *compressed_chunks_queue; struct wimlib_compressor *compressor; @@ -89,25 +89,19 @@ struct parallel_chunk_compressor { static int message_queue_init(struct message_queue *q) { - if (pthread_mutex_init(&q->lock, NULL)) { - ERROR_WITH_ERRNO("Failed to initialize mutex"); + if (!mutex_init(&q->lock)) goto err; - } - if (pthread_cond_init(&q->msg_avail_cond, NULL)) { - ERROR_WITH_ERRNO("Failed to initialize condition variable"); + if (!condvar_init(&q->msg_avail_cond)) goto err_destroy_lock; - } - if (pthread_cond_init(&q->space_avail_cond, NULL)) { - ERROR_WITH_ERRNO("Failed to initialize condition variable"); + if (!condvar_init(&q->space_avail_cond)) goto err_destroy_msg_avail_cond; - } INIT_LIST_HEAD(&q->list); return 0; err_destroy_msg_avail_cond: - pthread_cond_destroy(&q->msg_avail_cond); + condvar_destroy(&q->msg_avail_cond); err_destroy_lock: - pthread_mutex_destroy(&q->lock); + mutex_destroy(&q->lock); err: return WIMLIB_ERR_NOMEM; } @@ -116,19 +110,19 @@ static void message_queue_destroy(struct message_queue *q) { if (q->list.next != NULL) { - pthread_mutex_destroy(&q->lock); - pthread_cond_destroy(&q->msg_avail_cond); - pthread_cond_destroy(&q->space_avail_cond); + mutex_destroy(&q->lock); + condvar_destroy(&q->msg_avail_cond); + condvar_destroy(&q->space_avail_cond); } } static void message_queue_put(struct message_queue *q, struct message *msg) { - pthread_mutex_lock(&q->lock); + mutex_lock(&q->lock); list_add_tail(&msg->list, &q->list); - pthread_cond_signal(&q->msg_avail_cond); - pthread_mutex_unlock(&q->lock); + condvar_signal(&q->msg_avail_cond); + mutex_unlock(&q->lock); } static struct message * @@ -136,25 +130,25 @@ message_queue_get(struct message_queue *q) { struct message *msg; - pthread_mutex_lock(&q->lock); + mutex_lock(&q->lock); while (list_empty(&q->list) && !q->terminating) - pthread_cond_wait(&q->msg_avail_cond, &q->lock); + condvar_wait(&q->msg_avail_cond, &q->lock); if (!q->terminating) { msg = list_entry(q->list.next, struct message, list); list_del(&msg->list); } else msg = NULL; - pthread_mutex_unlock(&q->lock); + mutex_unlock(&q->lock); return msg; } static void message_queue_terminate(struct message_queue *q) { - pthread_mutex_lock(&q->lock); + mutex_lock(&q->lock); q->terminating = true; - pthread_cond_broadcast(&q->msg_avail_cond); - pthread_mutex_unlock(&q->lock); + condvar_broadcast(&q->msg_avail_cond); + mutex_unlock(&q->lock); } static int @@ -248,7 +242,7 @@ parallel_chunk_compressor_destroy(struct chunk_compressor *_ctx) message_queue_terminate(&ctx->chunks_to_compress_queue); for (i = 0; i < ctx->num_started_threads; i++) - pthread_join(ctx->thread_data[i].thread, NULL); + thread_join(&ctx->thread_data[i].thread); } message_queue_destroy(&ctx->chunks_to_compress_queue); @@ -475,15 +469,10 @@ new_parallel_chunk_compressor(int out_ctype, u32 out_chunk_size, ctx->num_started_threads < num_threads; ctx->num_started_threads++) { - ret = pthread_create(&ctx->thread_data[ctx->num_started_threads].thread, - NULL, - compressor_thread_proc, - &ctx->thread_data[ctx->num_started_threads]); - if (ret) { - errno = ret; - WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u", - ctx->num_started_threads + 1, - num_threads); + if (!thread_create(&ctx->thread_data[ctx->num_started_threads].thread, + compressor_thread_proc, + &ctx->thread_data[ctx->num_started_threads])) + { ret = WIMLIB_ERR_NOMEM; if (ctx->num_started_threads >= 2) break;