# include "config.h"
#endif
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+
#include "wimlib/assert.h"
-#include "wimlib/compress_chunks.h"
+#include "wimlib/chunk_compressor.h"
#include "wimlib/error.h"
#include "wimlib/list.h"
#include "wimlib/util.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#ifdef HAVE_SYS_SYSCTL_H
+# include <sys/sysctl.h>
+#endif
struct message_queue {
struct list_head list;
pthread_t thread;
struct message_queue *chunks_to_compress_queue;
struct message_queue *compressed_chunks_queue;
- int out_ctype;
- u32 out_chunk_size;
- struct wimlib_lzx_context *comp_ctx;
+ struct wimlib_compressor *compressor;
};
#define MAX_CHUNKS_PER_MSG 2
struct message {
u8 *uncompressed_chunks[MAX_CHUNKS_PER_MSG];
u8 *compressed_chunks[MAX_CHUNKS_PER_MSG];
- unsigned uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
- unsigned compressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ u32 uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ u32 compressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
size_t num_filled_chunks;
size_t num_alloc_chunks;
struct list_head list;
if (phys_bytes == 0)
goto default_size;
return phys_bytes;
-#else
+#elif defined(_SC_PAGESIZE) && defined(_SC_PHYS_PAGES)
long page_size = sysconf(_SC_PAGESIZE);
long num_pages = sysconf(_SC_PHYS_PAGES);
if (page_size <= 0 || num_pages <= 0)
goto default_size;
return ((u64)page_size * (u64)num_pages);
+#else
+ int mib[2] = {CTL_HW, HW_MEMSIZE};
+ u64 memsize;
+ size_t len = sizeof(memsize);
+ if (sysctl(mib, ARRAY_LEN(mib), &memsize, &len, NULL, 0) < 0 || len != 8)
+ goto default_size;
+ return memsize;
#endif
default_size:
}
static void
-compress_chunks(struct message *msg, int out_ctype,
- struct wimlib_lzx_context *comp_ctx)
+compress_chunks(struct message *msg, struct wimlib_compressor *compressor)
{
for (size_t i = 0; i < msg->num_filled_chunks; i++) {
+ wimlib_assert(msg->uncompressed_chunk_sizes[i] != 0);
msg->compressed_chunk_sizes[i] =
- compress_chunk(msg->uncompressed_chunks[i],
- msg->uncompressed_chunk_sizes[i],
- msg->compressed_chunks[i],
- out_ctype,
- comp_ctx);
+ wimlib_compress(msg->uncompressed_chunks[i],
+ msg->uncompressed_chunk_sizes[i],
+ msg->compressed_chunks[i],
+ msg->uncompressed_chunk_sizes[i] - 1,
+ compressor);
}
}
struct message *msg;
while ((msg = message_queue_get(params->chunks_to_compress_queue)) != NULL) {
- compress_chunks(msg, params->out_ctype, params->comp_ctx);
+ compress_chunks(msg, params->compressor);
message_queue_put(params->compressed_chunks_queue, msg);
}
return NULL;
message_queue_destroy(&ctx->chunks_to_compress_queue);
message_queue_destroy(&ctx->compressed_chunks_queue);
- if (ctx->base.out_ctype == WIMLIB_COMPRESSION_TYPE_LZX &&
- ctx->thread_data != NULL)
+ if (ctx->thread_data != NULL)
for (i = 0; i < ctx->num_threads; i++)
- wimlib_lzx_free_context(ctx->thread_data[i].comp_ctx);
+ wimlib_free_compressor(ctx->thread_data[i].compressor);
FREE(ctx->thread_data);
static bool
parallel_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
- const void **cdata_ret, unsigned *csize_ret,
- unsigned *usize_ret)
+ const void **cdata_ret, u32 *csize_ret,
+ u32 *usize_ret)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
unsigned desired_num_threads;
wimlib_assert(out_chunk_size > 0);
- wimlib_assert(out_ctype != WIMLIB_COMPRESSION_TYPE_NONE);
if (num_threads == 0)
num_threads = get_default_num_threads();
desired_num_threads = num_threads;
- chunks_per_msg = MAX_CHUNKS_PER_MSG;
- msgs_per_thread = 2;
+ if (out_chunk_size < ((u32)1 << 23)) {
+ chunks_per_msg = MAX_CHUNKS_PER_MSG;
+ msgs_per_thread = 2;
+ } else {
+ /* Big chunks: Just have one buffer per thread --- more would
+ * just waste memory. */
+ chunks_per_msg = 1;
+ msgs_per_thread = 1;
+ }
for (;;) {
approx_mem_required =
(u64)chunks_per_msg *
(u64)msgs_per_thread *
(u64)num_threads *
(u64)out_chunk_size
+ + out_chunk_size
+ 1000000
- + (out_chunk_size * num_threads * 4);
+ + num_threads * wimlib_get_compressor_needed_memory(out_ctype,
+ out_chunk_size,
+ NULL);
if (approx_mem_required <= max_memory)
break;
dat->chunks_to_compress_queue = &ctx->chunks_to_compress_queue;
dat->compressed_chunks_queue = &ctx->compressed_chunks_queue;
- dat->out_ctype = out_ctype;
- dat->out_chunk_size = out_chunk_size;
- if (out_ctype == WIMLIB_COMPRESSION_TYPE_LZX) {
- ret = wimlib_lzx_alloc_context(out_chunk_size, NULL,
- &dat->comp_ctx);
- if (ret)
- goto err;
- }
+ ret = wimlib_create_compressor(out_ctype, out_chunk_size,
+ NULL, &dat->compressor);
+ if (ret)
+ goto err;
}
for (ctx->num_started_threads = 0;
if (ret) {
errno = ret;
ret = WIMLIB_ERR_NOMEM;
- WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u");
+ WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u",
+ ctx->num_started_threads + 1,
+ num_threads);
goto err;
}
}
parallel_chunk_compressor_destroy(&ctx->base);
return ret;
}
+
+#endif /* ENABLE_MULTITHREADED_COMPRESSION */