]> wimlib.net Git - wimlib/blobdiff - src/compress_parallel.c
Use native Windows threads on Windows
[wimlib] / src / compress_parallel.c
index 4c2959068e989aa286e008c8a8e4cf17e3170c03..159b9ce54ab24e62ac08915bc9815d8956521900 100644 (file)
@@ -26,7 +26,6 @@
 #endif
 
 #include <errno.h>
-#include <pthread.h>
 #include <stdlib.h>
 #include <string.h>
 
 #include "wimlib/chunk_compressor.h"
 #include "wimlib/error.h"
 #include "wimlib/list.h"
+#include "wimlib/threads.h"
 #include "wimlib/util.h"
 
 struct message_queue {
        struct list_head list;
-       pthread_mutex_t lock;
-       pthread_cond_t msg_avail_cond;
-       pthread_cond_t space_avail_cond;
+       struct mutex lock;
+       struct condvar msg_avail_cond;
+       struct condvar space_avail_cond;
        bool terminating;
 };
 
 struct compressor_thread_data {
-       pthread_t thread;
+       struct thread thread;
        struct message_queue *chunks_to_compress_queue;
        struct message_queue *compressed_chunks_queue;
        struct wimlib_compressor *compressor;
@@ -89,25 +89,19 @@ struct parallel_chunk_compressor {
 static int
 message_queue_init(struct message_queue *q)
 {
-       if (pthread_mutex_init(&q->lock, NULL)) {
-               ERROR_WITH_ERRNO("Failed to initialize mutex");
+       if (!mutex_init(&q->lock))
                goto err;
-       }
-       if (pthread_cond_init(&q->msg_avail_cond, NULL)) {
-               ERROR_WITH_ERRNO("Failed to initialize condition variable");
+       if (!condvar_init(&q->msg_avail_cond))
                goto err_destroy_lock;
-       }
-       if (pthread_cond_init(&q->space_avail_cond, NULL)) {
-               ERROR_WITH_ERRNO("Failed to initialize condition variable");
+       if (!condvar_init(&q->space_avail_cond))
                goto err_destroy_msg_avail_cond;
-       }
        INIT_LIST_HEAD(&q->list);
        return 0;
 
 err_destroy_msg_avail_cond:
-       pthread_cond_destroy(&q->msg_avail_cond);
+       condvar_destroy(&q->msg_avail_cond);
 err_destroy_lock:
-       pthread_mutex_destroy(&q->lock);
+       mutex_destroy(&q->lock);
 err:
        return WIMLIB_ERR_NOMEM;
 }
@@ -116,19 +110,19 @@ static void
 message_queue_destroy(struct message_queue *q)
 {
        if (q->list.next != NULL) {
-               pthread_mutex_destroy(&q->lock);
-               pthread_cond_destroy(&q->msg_avail_cond);
-               pthread_cond_destroy(&q->space_avail_cond);
+               mutex_destroy(&q->lock);
+               condvar_destroy(&q->msg_avail_cond);
+               condvar_destroy(&q->space_avail_cond);
        }
 }
 
 static void
 message_queue_put(struct message_queue *q, struct message *msg)
 {
-       pthread_mutex_lock(&q->lock);
+       mutex_lock(&q->lock);
        list_add_tail(&msg->list, &q->list);
-       pthread_cond_signal(&q->msg_avail_cond);
-       pthread_mutex_unlock(&q->lock);
+       condvar_signal(&q->msg_avail_cond);
+       mutex_unlock(&q->lock);
 }
 
 static struct message *
@@ -136,25 +130,25 @@ message_queue_get(struct message_queue *q)
 {
        struct message *msg;
 
-       pthread_mutex_lock(&q->lock);
+       mutex_lock(&q->lock);
        while (list_empty(&q->list) && !q->terminating)
-               pthread_cond_wait(&q->msg_avail_cond, &q->lock);
+               condvar_wait(&q->msg_avail_cond, &q->lock);
        if (!q->terminating) {
                msg = list_entry(q->list.next, struct message, list);
                list_del(&msg->list);
        } else
                msg = NULL;
-       pthread_mutex_unlock(&q->lock);
+       mutex_unlock(&q->lock);
        return msg;
 }
 
 static void
 message_queue_terminate(struct message_queue *q)
 {
-       pthread_mutex_lock(&q->lock);
+       mutex_lock(&q->lock);
        q->terminating = true;
-       pthread_cond_broadcast(&q->msg_avail_cond);
-       pthread_mutex_unlock(&q->lock);
+       condvar_broadcast(&q->msg_avail_cond);
+       mutex_unlock(&q->lock);
 }
 
 static int
@@ -248,7 +242,7 @@ parallel_chunk_compressor_destroy(struct chunk_compressor *_ctx)
                message_queue_terminate(&ctx->chunks_to_compress_queue);
 
                for (i = 0; i < ctx->num_started_threads; i++)
-                       pthread_join(ctx->thread_data[i].thread, NULL);
+                       thread_join(&ctx->thread_data[i].thread);
        }
 
        message_queue_destroy(&ctx->chunks_to_compress_queue);
@@ -475,15 +469,10 @@ new_parallel_chunk_compressor(int out_ctype, u32 out_chunk_size,
             ctx->num_started_threads < num_threads;
             ctx->num_started_threads++)
        {
-               ret = pthread_create(&ctx->thread_data[ctx->num_started_threads].thread,
-                                    NULL,
-                                    compressor_thread_proc,
-                                    &ctx->thread_data[ctx->num_started_threads]);
-               if (ret) {
-                       errno = ret;
-                       WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u",
-                                          ctx->num_started_threads + 1,
-                                          num_threads);
+               if (!thread_create(&ctx->thread_data[ctx->num_started_threads].thread,
+                                  compressor_thread_proc,
+                                  &ctx->thread_data[ctx->num_started_threads]))
+               {
                        ret = WIMLIB_ERR_NOMEM;
                        if (ctx->num_started_threads >= 2)
                                break;