}
#ifdef ENABLE_MULTITHREADED_COMPRESSION
+
+/* Blocking shared queue (solves the producer-consumer problem) */
struct shared_queue {
unsigned size;
unsigned front;
static int shared_queue_init(struct shared_queue *q, unsigned size)
{
+ wimlib_assert(size != 0);
q->array = CALLOC(sizeof(q->array[0]), size);
if (!q->array)
return WIMLIB_ERR_NOMEM;
}
}
+/* Compressor thread routine. This is a lot simpler than the main thread
+ * routine: just repeatedly get a group of chunks from the
+ * res_to_compress_queue, compress them, and put them in the
+ * compressed_res_queue. A NULL pointer indicates that the thread should stop.
+ * */
static void *compressor_thread_proc(void *arg)
{
struct compressor_thread_params *params = arg;
int out_ctype,
struct shared_queue *res_to_compress_queue,
struct shared_queue *compressed_res_queue,
- size_t queue_size,
+ size_t num_messages,
int write_flags,
wimlib_progress_func_t progress_func,
union wimlib_progress_info *progress)
{
int ret;
-
- struct message msgs[queue_size];
- ZERO_ARRAY(msgs);
+ struct chunk_table *cur_chunk_tab = NULL;
+ struct message *msgs = CALLOC(num_messages, sizeof(struct message));
+ struct lookup_table_entry *next_lte = NULL;
// Initially, all the messages are available to use.
LIST_HEAD(available_msgs);
- for (size_t i = 0; i < ARRAY_LEN(msgs); i++)
+
+ if (!msgs) {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out;
+ }
+
+ for (size_t i = 0; i < num_messages; i++)
list_add(&msgs[i].list, &available_msgs);
// outstanding_resources is the list of resources that currently have
//
LIST_HEAD(outstanding_resources);
struct list_head *next_resource = stream_list->next;
- struct lookup_table_entry *next_lte = NULL;
u64 next_chunk = 0;
u64 next_num_chunks = 0;
LIST_HEAD(my_resources);
struct lookup_table_entry *cur_lte = NULL;
- struct chunk_table *cur_chunk_tab = NULL;
struct message *msg;
#ifdef WITH_NTFS_3G
DEBUG("Initializing buffers for uncompressed "
"and compressed data (%zu bytes needed)",
- queue_size * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2);
+ num_messages * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2);
// Pre-allocate all the buffers that will be needed to do the chunk
// compression.
- for (size_t i = 0; i < ARRAY_LEN(msgs); i++) {
+ for (size_t i = 0; i < num_messages; i++) {
for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE);
if (msgs[i].compressed_chunks[j] == NULL ||
msgs[i].uncompressed_chunks[j] == NULL)
{
- ERROR("Could not allocate enough memory for "
- "multi-threaded compression");
ret = WIMLIB_ERR_NOMEM;
goto out;
}
// list, or we get to a resource that needs compression.
while (1) {
if (next_resource == stream_list) {
+ // No more resources to send for
+ // compression
next_lte = NULL;
break;
}
if (ret != 0)
goto out;
- if (cur_lte == NULL)
+ if (cur_lte == NULL) {
+ // Set cur_lte for the
+ // first time
cur_lte = next_lte;
+ }
break;
}
}
}
- if (next_lte == NULL)
+ if (next_lte == NULL) {
+ // No more resources to send for compression
break;
+ }
// Get a message from the available messages
// list
// message so that a compressor thread can
// compress it.
- if (next_chunk == next_num_chunks - 1 &&
- wim_resource_size(next_lte) % WIM_CHUNK_SIZE != 0)
- {
- size = wim_resource_size(next_lte) % WIM_CHUNK_SIZE;
+ if (next_chunk == next_num_chunks - 1) {
+ size = MODULO_NONZERO(wim_resource_size(next_lte),
+ WIM_CHUNK_SIZE);
}
-
DEBUG2("Read resource (size=%u, offset=%zu)",
size, next_chunk * WIM_CHUNK_SIZE);
// message around until all earlier chunks are received.
//
// Otherwise, write all the chunks we can.
- while (cur_lte != NULL && !list_empty(&cur_lte->msg_list)
- && (msg = container_of(cur_lte->msg_list.next,
- struct message,
- list))->complete)
+ while (cur_lte != NULL &&
+ (msg = container_of(cur_lte->msg_list.next,
+ struct message,
+ list))->complete)
{
DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
if (msg->begin_chunk == 0) {
// now.
list_add(&msg->list, &available_msgs);
- // Was this the last chunk of the stream? If so,
- // finish it.
+ // Was this the last chunk of the stream? If so, finish
+ // it.
if (list_empty(&cur_lte->msg_list) &&
msg->begin_chunk + msg->num_chunks == cur_chunk_tab->num_chunks)
{
struct list_head *next = cur_lte->staging_list.next;
list_del(&cur_lte->staging_list);
- if (next == &outstanding_resources) {
- if (next_lte == NULL) {
- DEBUG("No more outstanding resources");
- ret = 0;
- goto out;
- } else {
- DEBUG("No more outstanding resources---"
- "but still more to compress!");
- cur_lte = NULL;
- }
- } else {
+ if (next == &outstanding_resources)
+ cur_lte = NULL;
+ else
cur_lte = container_of(cur_lte->staging_list.next,
struct lookup_table_entry,
staging_list);
- }
// Since we just finished writing a stream,
// write any streams that have been added to the
}
out:
+ if (ret == WIMLIB_ERR_NOMEM) {
+ ERROR("Could not allocate enough memory for "
+ "multi-threaded compression");
+ }
+
if (next_lte) {
#ifdef WITH_NTFS_3G
end_wim_resource_read(next_lte, ni);
out_ctype, progress_func,
progress, 0);
} else {
- size_t num_available_msgs = 0;
- struct list_head *cur;
+ if (msgs) {
+ size_t num_available_msgs = 0;
+ struct list_head *cur;
- list_for_each(cur, &available_msgs) {
- num_available_msgs++;
- }
+ list_for_each(cur, &available_msgs) {
+ num_available_msgs++;
+ }
- while (num_available_msgs < ARRAY_LEN(msgs)) {
- shared_queue_get(compressed_res_queue);
- num_available_msgs++;
+ while (num_available_msgs < num_messages) {
+ shared_queue_get(compressed_res_queue);
+ num_available_msgs++;
+ }
}
}
- for (size_t i = 0; i < ARRAY_LEN(msgs); i++) {
- for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
- FREE(msgs[i].compressed_chunks[j]);
- FREE(msgs[i].uncompressed_chunks[j]);
+ if (msgs) {
+ for (size_t i = 0; i < num_messages; i++) {
+ for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
+ FREE(msgs[i].compressed_chunks[j]);
+ FREE(msgs[i].uncompressed_chunks[j]);
+ }
}
+ FREE(msgs);
}
- if (cur_chunk_tab != NULL)
- FREE(cur_chunk_tab);
+ FREE(cur_chunk_tab);
return ret;
}
params.compress = get_compress_func(out_ctype);
compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+ if (!compressor_threads) {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out_destroy_compressed_res_queue;
+ }
for (unsigned i = 0; i < num_threads; i++) {
DEBUG("pthread_create thread %u", i);
}
}
FREE(compressor_threads);
+out_destroy_compressed_res_queue:
shared_queue_destroy(&compressed_res_queue);
out_destroy_res_to_compress_queue:
shared_queue_destroy(&res_to_compress_queue);
u64 total_bytes = 0;
u64 total_compression_bytes = 0;
union wimlib_progress_info progress;
- int ret;
list_for_each_entry(lte, stream_list, staging_list) {
num_streams++;
progress.write_streams.num_threads = num_threads;
progress.write_streams.compression_type = out_ctype;
- if (num_streams == 0) {
- ret = 0;
- goto out;
- }
-
#ifdef ENABLE_MULTITHREADED_COMPRESSION
- if (total_compression_bytes >= 1000000 && num_threads != 1) {
- ret = write_stream_list_parallel(stream_list,
- out_fp,
- out_ctype,
- write_flags,
- num_threads,
- progress_func,
- &progress);
- }
+ if (total_compression_bytes >= 1000000 && num_threads != 1)
+ return write_stream_list_parallel(stream_list,
+ out_fp,
+ out_ctype,
+ write_flags,
+ num_threads,
+ progress_func,
+ &progress);
else
#endif
- {
- ret = write_stream_list_serial(stream_list,
- out_fp,
- out_ctype,
- write_flags,
- progress_func,
- &progress);
- }
-out:
- return ret;
+ return write_stream_list_serial(stream_list,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ &progress);
}
static int check_resource_offset(struct lookup_table_entry *lte, void *arg)
{
- if (lte->out_refcnt > lte->refcnt) {
- WARNING("Detected invalid stream reference count. "
- "Forcing re-build of entire WIM.");
- return WIMLIB_ERR_RESOURCE_ORDER;
- } else if (lte->out_refcnt < lte->refcnt) {
+ wimlib_assert(lte->out_refcnt <= lte->refcnt);
+ if (lte->out_refcnt < lte->refcnt) {
off_t end_offset = *(u64*)arg;
- if (lte->resource_entry.offset + lte->resource_entry.size > end_offset) {
+ if (lte->resource_entry.offset +
+ lte->resource_entry.size > end_offset)
+ {
ERROR("The following resource is after the XML data:");
print_lookup_table_entry(lte);
return WIMLIB_ERR_RESOURCE_ORDER;
static int find_new_streams(struct lookup_table_entry *lte, void *arg)
{
- if (lte->out_refcnt == lte->refcnt)
+ if (lte->out_refcnt == lte->refcnt) {
+ /* Newly added stream that is only referenced in the modified
+ * images. Append it to the list of streams to write. */
list_add(<e->staging_list, (struct list_head*)arg);
- else
+ } else {
+ /* Not a newly added stream. But set out_refcnt to the full
+ * refcnt so that it's written correctly. */
lte->out_refcnt = lte->refcnt;
+ }
return 0;
}