struct message_queue chunks_to_compress_queue;
struct message_queue compressed_chunks_queue;
struct compressor_thread_data *thread_data;
- unsigned num_threads;
+ unsigned num_thread_data;
unsigned num_started_threads;
struct message *msgs;
default_size:
WARNING("Failed to determine available memory; assuming 1 GiB");
- return 1U << 30;
+ return 1ULL << 30;
}
static int
message_queue_destroy(&ctx->compressed_chunks_queue);
if (ctx->thread_data != NULL)
- for (i = 0; i < ctx->num_threads; i++)
+ for (i = 0; i < ctx->num_thread_data; i++)
wimlib_free_compressor(ctx->thread_data[i].compressor);
FREE(ctx->thread_data);
static bool
parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
- const void *chunk, size_t size)
+ const void *chunk, u32 size)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
ctx->base.out_ctype = out_ctype;
ctx->base.out_chunk_size = out_chunk_size;
- ctx->base.num_threads = num_threads;
ctx->base.destroy = parallel_chunk_compressor_destroy;
ctx->base.submit_chunk = parallel_chunk_compressor_submit_chunk;
ctx->base.get_chunk = parallel_chunk_compressor_get_chunk;
- ctx->num_threads = num_threads;
+ ctx->num_thread_data = num_threads;
ret = message_queue_init(&ctx->chunks_to_compress_queue);
if (ret)
&ctx->thread_data[ctx->num_started_threads]);
if (ret) {
errno = ret;
- ret = WIMLIB_ERR_NOMEM;
WARNING_WITH_ERRNO("Failed to create compressor thread %u of %u",
ctx->num_started_threads + 1,
num_threads);
+ ret = WIMLIB_ERR_NOMEM;
+ if (ctx->num_started_threads >= 2)
+ break;
goto err;
}
}
+ ctx->base.num_threads = ctx->num_started_threads;
+
ret = WIMLIB_ERR_NOMEM;
- ctx->num_messages = num_threads * msgs_per_thread;
+ ctx->num_messages = ctx->num_started_threads * msgs_per_thread;
ctx->msgs = allocate_messages(ctx->num_messages,
chunks_per_msg, out_chunk_size);
if (ctx->msgs == NULL)