/*
* Copyright (C) 2013 Eric Biggers
*
- * This file is part of wimlib, a library for working with WIM files.
+ * This file is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 3 of the License, or (at your option) any
+ * later version.
*
- * wimlib is free software; you can redistribute it and/or modify it under the
- * terms of the GNU General Public License as published by the Free Software
- * Foundation; either version 3 of the License, or (at your option) any later
- * version.
+ * This file is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
*
- * wimlib is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * wimlib; if not, see http://www.gnu.org/licenses/.
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this file; if not, see http://www.gnu.org/licenses/.
*/
#ifdef HAVE_CONFIG_H
#ifdef ENABLE_MULTITHREADED_COMPRESSION
+#include <errno.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
#include "wimlib/assert.h"
#include "wimlib/chunk_compressor.h"
#include "wimlib/error.h"
#include "wimlib/list.h"
#include "wimlib/util.h"
-#ifdef __WIN32__
-# include "wimlib/win32.h" /* win32_get_number_of_processors() */
-#endif
-
-#include <errno.h>
-#include <limits.h>
-#include <pthread.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#ifdef HAVE_SYS_SYSCTL_H
-# include <sys/sysctl.h>
-#endif
struct message_queue {
struct list_head list;
struct wimlib_compressor *compressor;
};
-#define MAX_CHUNKS_PER_MSG 2
+#define MAX_CHUNKS_PER_MSG 16
struct message {
u8 *uncompressed_chunks[MAX_CHUNKS_PER_MSG];
u8 *compressed_chunks[MAX_CHUNKS_PER_MSG];
- unsigned uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
- unsigned compressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ u32 uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ u32 compressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
size_t num_filled_chunks;
size_t num_alloc_chunks;
struct list_head list;
struct message_queue chunks_to_compress_queue;
struct message_queue compressed_chunks_queue;
struct compressor_thread_data *thread_data;
- unsigned num_threads;
+ unsigned num_thread_data;
unsigned num_started_threads;
struct message *msgs;
size_t next_chunk_idx;
};
-static unsigned
-get_default_num_threads(void)
-{
- long n;
-#ifdef __WIN32__
- n = win32_get_number_of_processors();
-#else
- n = sysconf(_SC_NPROCESSORS_ONLN);
-#endif
- if (n < 1 || n >= UINT_MAX) {
- WARNING("Failed to determine number of processors; assuming 1.");
- return 1;
- }
- return n;
-}
-
-static u64
-get_avail_memory(void)
-{
-#ifdef __WIN32__
- u64 phys_bytes = win32_get_avail_memory();
- if (phys_bytes == 0)
- goto default_size;
- return phys_bytes;
-#elif defined(_SC_PAGESIZE) && defined(_SC_PHYS_PAGES)
- long page_size = sysconf(_SC_PAGESIZE);
- long num_pages = sysconf(_SC_PHYS_PAGES);
- if (page_size <= 0 || num_pages <= 0)
- goto default_size;
- return ((u64)page_size * (u64)num_pages);
-#else
- int mib[2] = {CTL_HW, HW_MEMSIZE};
- u64 memsize;
- size_t len = sizeof(memsize);
- if (sysctl(mib, ARRAY_LEN(mib), &memsize, &len, NULL, 0) < 0 || len != 8)
- goto default_size;
- return memsize;
-#endif
-default_size:
- WARNING("Failed to determine available memory; assuming 1 GiB");
- return 1U << 30;
-}
static int
message_queue_init(struct message_queue *q)
return;
if (ctx->num_started_threads != 0) {
- DEBUG("Terminating %u compressor threads", ctx->num_started_threads);
message_queue_terminate(&ctx->chunks_to_compress_queue);
for (i = 0; i < ctx->num_started_threads; i++)
message_queue_destroy(&ctx->compressed_chunks_queue);
if (ctx->thread_data != NULL)
- for (i = 0; i < ctx->num_threads; i++)
+ for (i = 0; i < ctx->num_thread_data; i++)
wimlib_free_compressor(ctx->thread_data[i].compressor);
FREE(ctx->thread_data);
ctx->next_submit_msg = NULL;
}
-static bool
-parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
- const void *chunk, size_t size)
+static void *
+parallel_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
- wimlib_assert(size > 0);
- wimlib_assert(size <= ctx->base.out_chunk_size);
-
if (ctx->next_submit_msg) {
msg = ctx->next_submit_msg;
} else {
if (list_empty(&ctx->available_msgs))
- return false;
+ return NULL;
msg = list_entry(ctx->available_msgs.next, struct message, list);
list_del(&msg->list);
msg->num_filled_chunks = 0;
}
- memcpy(msg->uncompressed_chunks[msg->num_filled_chunks], chunk, size);
- msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = size;
+ return msg->uncompressed_chunks[msg->num_filled_chunks];
+}
+
+static void
+parallel_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize)
+{
+ struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
+ struct message *msg;
+
+ wimlib_assert(usize > 0);
+ wimlib_assert(usize <= ctx->base.out_chunk_size);
+ wimlib_assert(ctx->next_submit_msg);
+
+ msg = ctx->next_submit_msg;
+ msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = usize;
if (++msg->num_filled_chunks == msg->num_alloc_chunks)
submit_compression_msg(ctx);
- return true;
}
static bool
-parallel_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
- const void **cdata_ret, unsigned *csize_ret,
- unsigned *usize_ret)
+parallel_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx,
+ const void **cdata_ret, u32 *csize_ret,
+ u32 *usize_ret)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
-
if (ctx->next_submit_msg)
submit_compression_msg(ctx);
unsigned desired_num_threads;
wimlib_assert(out_chunk_size > 0);
- wimlib_assert(out_ctype != WIMLIB_COMPRESSION_TYPE_NONE);
if (num_threads == 0)
- num_threads = get_default_num_threads();
+ num_threads = get_available_cpus();
- if (num_threads == 1) {
- DEBUG("Only 1 thread; Not bothering with "
- "parallel chunk compressor.");
+ if (num_threads == 1)
return -1;
- }
if (max_memory == 0)
- max_memory = get_avail_memory();
+ max_memory = get_available_memory();
desired_num_threads = num_threads;
if (out_chunk_size < ((u32)1 << 23)) {
- chunks_per_msg = MAX_CHUNKS_PER_MSG;
+ /* Relatively small chunks. Use 2 messages per thread, each
+ * with at least 2 chunks. Use more chunks per message if there
+ * are lots of threads and/or the chunks are very small. */
+ chunks_per_msg = 2;
+ chunks_per_msg += num_threads * (65536 / out_chunk_size) / 16;
+ chunks_per_msg = max(chunks_per_msg, 2);
+ chunks_per_msg = min(chunks_per_msg, MAX_CHUNKS_PER_MSG);
msgs_per_thread = 2;
} else {
/* Big chunks: Just have one buffer per thread --- more would
+ 1000000
+ num_threads * wimlib_get_compressor_needed_memory(out_ctype,
out_chunk_size,
- NULL);
+ 0);
if (approx_mem_required <= max_memory)
break;
desired_num_threads, num_threads);
}
- if (num_threads == 1) {
- DEBUG("Only 1 thread; Not bothering with "
- "parallel chunk compressor.");
+ if (num_threads == 1)
return -2;
- }
ret = WIMLIB_ERR_NOMEM;
ctx = CALLOC(1, sizeof(*ctx));
ctx->base.out_ctype = out_ctype;
ctx->base.out_chunk_size = out_chunk_size;
- ctx->base.num_threads = num_threads;
ctx->base.destroy = parallel_chunk_compressor_destroy;
- ctx->base.submit_chunk = parallel_chunk_compressor_submit_chunk;
- ctx->base.get_chunk = parallel_chunk_compressor_get_chunk;
+ ctx->base.get_chunk_buffer = parallel_chunk_compressor_get_chunk_buffer;
+ ctx->base.signal_chunk_filled = parallel_chunk_compressor_signal_chunk_filled;
+ ctx->base.get_compression_result = parallel_chunk_compressor_get_compression_result;
- ctx->num_threads = num_threads;
+ ctx->num_thread_data = num_threads;
ret = message_queue_init(&ctx->chunks_to_compress_queue);
if (ret)
dat->chunks_to_compress_queue = &ctx->chunks_to_compress_queue;
dat->compressed_chunks_queue = &ctx->compressed_chunks_queue;
ret = wimlib_create_compressor(out_ctype, out_chunk_size,
- NULL, &dat->compressor);
+ WIMLIB_COMPRESSOR_FLAG_DESTRUCTIVE,
+ &dat->compressor);
if (ret)
goto err;
}
ctx->num_started_threads < num_threads;
ctx->num_started_threads++)
{
- DEBUG("pthread_create thread %u of %u",
- ctx->num_started_threads + 1, num_threads);
ret = pthread_create(&ctx->thread_data[ctx->num_started_threads].thread,
NULL,
compressor_thread_proc,
&ctx->thread_data[ctx->num_started_threads]);
if (ret) {
errno = ret;
- ret = WIMLIB_ERR_NOMEM;
WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u",
ctx->num_started_threads + 1,
num_threads);
+ ret = WIMLIB_ERR_NOMEM;
+ if (ctx->num_started_threads >= 2)
+ break;
goto err;
}
}
+ ctx->base.num_threads = ctx->num_started_threads;
+
ret = WIMLIB_ERR_NOMEM;
- ctx->num_messages = num_threads * msgs_per_thread;
+ ctx->num_messages = ctx->num_started_threads * msgs_per_thread;
ctx->msgs = allocate_messages(ctx->num_messages,
chunks_per_msg, out_chunk_size);
if (ctx->msgs == NULL)