*/
/*
- * Copyright (C) 2013 Eric Biggers
+ * Copyright (C) 2013-2023 Eric Biggers
*
* This file is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* details.
*
* You should have received a copy of the GNU Lesser General Public License
- * along with this file; if not, see http://www.gnu.org/licenses/.
+ * along with this file; if not, see https://www.gnu.org/licenses/.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
-#ifdef ENABLE_MULTITHREADED_COMPRESSION
-
#include <errno.h>
-#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "wimlib/chunk_compressor.h"
#include "wimlib/error.h"
#include "wimlib/list.h"
+#include "wimlib/threads.h"
#include "wimlib/util.h"
struct message_queue {
struct list_head list;
- pthread_mutex_t lock;
- pthread_cond_t msg_avail_cond;
- pthread_cond_t space_avail_cond;
+ struct mutex lock;
+ struct condvar msg_avail_cond;
+ struct condvar space_avail_cond;
bool terminating;
};
struct compressor_thread_data {
- pthread_t thread;
+ struct thread thread;
struct message_queue *chunks_to_compress_queue;
struct message_queue *compressed_chunks_queue;
struct wimlib_compressor *compressor;
static int
message_queue_init(struct message_queue *q)
{
- if (pthread_mutex_init(&q->lock, NULL)) {
- ERROR_WITH_ERRNO("Failed to initialize mutex");
+ if (!mutex_init(&q->lock))
goto err;
- }
- if (pthread_cond_init(&q->msg_avail_cond, NULL)) {
- ERROR_WITH_ERRNO("Failed to initialize condition variable");
+ if (!condvar_init(&q->msg_avail_cond))
goto err_destroy_lock;
- }
- if (pthread_cond_init(&q->space_avail_cond, NULL)) {
- ERROR_WITH_ERRNO("Failed to initialize condition variable");
+ if (!condvar_init(&q->space_avail_cond))
goto err_destroy_msg_avail_cond;
- }
INIT_LIST_HEAD(&q->list);
return 0;
err_destroy_msg_avail_cond:
- pthread_cond_destroy(&q->msg_avail_cond);
+ condvar_destroy(&q->msg_avail_cond);
err_destroy_lock:
- pthread_mutex_destroy(&q->lock);
+ mutex_destroy(&q->lock);
err:
return WIMLIB_ERR_NOMEM;
}
message_queue_destroy(struct message_queue *q)
{
if (q->list.next != NULL) {
- pthread_mutex_destroy(&q->lock);
- pthread_cond_destroy(&q->msg_avail_cond);
- pthread_cond_destroy(&q->space_avail_cond);
+ mutex_destroy(&q->lock);
+ condvar_destroy(&q->msg_avail_cond);
+ condvar_destroy(&q->space_avail_cond);
}
}
static void
message_queue_put(struct message_queue *q, struct message *msg)
{
- pthread_mutex_lock(&q->lock);
+ mutex_lock(&q->lock);
list_add_tail(&msg->list, &q->list);
- pthread_cond_signal(&q->msg_avail_cond);
- pthread_mutex_unlock(&q->lock);
+ condvar_signal(&q->msg_avail_cond);
+ mutex_unlock(&q->lock);
}
static struct message *
{
struct message *msg;
- pthread_mutex_lock(&q->lock);
+ mutex_lock(&q->lock);
while (list_empty(&q->list) && !q->terminating)
- pthread_cond_wait(&q->msg_avail_cond, &q->lock);
+ condvar_wait(&q->msg_avail_cond, &q->lock);
if (!q->terminating) {
msg = list_entry(q->list.next, struct message, list);
list_del(&msg->list);
} else
msg = NULL;
- pthread_mutex_unlock(&q->lock);
+ mutex_unlock(&q->lock);
return msg;
}
static void
message_queue_terminate(struct message_queue *q)
{
- pthread_mutex_lock(&q->lock);
+ mutex_lock(&q->lock);
q->terminating = true;
- pthread_cond_broadcast(&q->msg_avail_cond);
- pthread_mutex_unlock(&q->lock);
+ condvar_broadcast(&q->msg_avail_cond);
+ mutex_unlock(&q->lock);
}
static int
message_queue_terminate(&ctx->chunks_to_compress_queue);
for (i = 0; i < ctx->num_started_threads; i++)
- pthread_join(ctx->thread_data[i].thread, NULL);
+ thread_join(&ctx->thread_data[i].thread);
}
message_queue_destroy(&ctx->chunks_to_compress_queue);
ctx->num_started_threads < num_threads;
ctx->num_started_threads++)
{
- ret = pthread_create(&ctx->thread_data[ctx->num_started_threads].thread,
- NULL,
- compressor_thread_proc,
- &ctx->thread_data[ctx->num_started_threads]);
- if (ret) {
- errno = ret;
- WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u",
- ctx->num_started_threads + 1,
- num_threads);
+ if (!thread_create(&ctx->thread_data[ctx->num_started_threads].thread,
+ compressor_thread_proc,
+ &ctx->thread_data[ctx->num_started_threads]))
+ {
ret = WIMLIB_ERR_NOMEM;
if (ctx->num_started_threads >= 2)
break;
parallel_chunk_compressor_destroy(&ctx->base);
return ret;
}
-
-#endif /* ENABLE_MULTITHREADED_COMPRESSION */