From 5c5f913d7676ea8a1cb9c843031c848751f82a3a Mon Sep 17 00:00:00 2001 From: Eric Biggers Date: Sun, 31 Mar 2013 01:36:48 -0500 Subject: [PATCH] Rewriting multithreaded compression code (IN PROGRESS) --- src/sha1.h | 2 +- src/write.c | 787 +++++++++++++++++++++++++--------------------------- 2 files changed, 381 insertions(+), 408 deletions(-) diff --git a/src/sha1.h b/src/sha1.h index d278aa92..7a3b3129 100644 --- a/src/sha1.h +++ b/src/sha1.h @@ -4,7 +4,7 @@ #include "config.h" #include #include -#include "string.h" +#include #include "util.h" #define SHA1_HASH_SIZE 20 diff --git a/src/write.c b/src/write.c index ba3b3158..0bb6bb3a 100644 --- a/src/write.c +++ b/src/write.c @@ -251,6 +251,26 @@ finish_wim_resource_chunk_tab(struct chunk_table *chunk_tab, return 0; } +static int +finalize_and_check_sha1(SHA_CTX *sha_ctx, struct wim_lookup_table_entry *lte) +{ + u8 md[SHA1_HASH_SIZE]; + sha1_final(md, sha_ctx); + if (lte->unhashed) { + copy_hash(lte->hash, md); + } else if (!hashes_equal(md, lte->hash)) { + ERROR("WIM resource has incorrect hash!"); + if (lte_filename_valid(lte)) { + ERROR("We were reading it from \"%"TS"\"; maybe " + "it changed while we were reading it.", + lte->file_on_disk); + } + return WIMLIB_ERR_INVALID_RESOURCE_HASH; + } + return 0; +} + + struct write_resource_ctx { compress_func_t compress; struct chunk_table *chunk_tab; @@ -378,20 +398,9 @@ try_write_again: /* Verify SHA1 message digest of the resource, or set the hash for the * first time. */ if (write_ctx.doing_sha) { - u8 md[SHA1_HASH_SIZE]; - sha1_final(md, &write_ctx.sha_ctx); - if (lte->unhashed) { - copy_hash(lte->hash, md); - } else if (!hashes_equal(md, lte->hash)) { - ERROR("WIM resource has incorrect hash!"); - if (lte_filename_valid(lte)) { - ERROR("We were reading it from \"%"TS"\"; maybe " - "it changed while we were reading it.", - lte->file_on_disk); - } - ret = WIMLIB_ERR_INVALID_RESOURCE_HASH; + ret = finalize_and_check_sha1(&write_ctx.sha_ctx, lte); + if (ret) goto out_free_chunk_tab; - } } out_res_entry->flags = lte->resource_entry.flags; @@ -607,11 +616,6 @@ do_write_streams_progress(union wimlib_progress_info *progress, } } -enum { - STREAMS_MERGED = 0, - STREAMS_NOT_MERGED = 1, -}; - static int do_write_stream_list(struct list_head *stream_list, struct wim_lookup_table *lookup_table, @@ -731,430 +735,396 @@ write_wim_chunks(struct message *msg, FILE *out_fp, return 0; } -/* - * This function is executed by the main thread when the resources are being - * compressed in parallel. The main thread is in change of all reading of the - * uncompressed data and writing of the compressed data. The compressor threads - * *only* do compression from/to in-memory buffers. - * - * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG - * chunks of compressed data to compress, represented in a `struct message'. - * Each message is passed from the main thread to a worker thread through the - * res_to_compress_queue, and it is passed back through the - * compressed_res_queue. - */ +struct main_writer_thread_ctx { + struct list_head *stream_list; + struct wim_lookup_table *lookup_table; + FILE *out_fp; + int out_ctype; + struct shared_queue *res_to_compress_queue; + struct shared_queue *compressed_res_queue; + size_t num_messages; + int write_flags; + wimlib_progress_func_t progress_func; + union wimlib_progress_info *progress; + + struct list_head available_msgs; + struct list_head outstanding_streams; + struct list_head serial_streams; + u64 next_chunk; + u64 next_num_chunks; + struct message *msgs; + struct message *next_msg; + size_t next_chunk_in_msg; + struct wim_lookup_table_entry *cur_lte; + struct chunk_table *cur_chunk_tab; + struct wim_lookup_table_entry *next_lte; + SHA_CTX sha_ctx; + u8 next_hash[20]; +}; + static int -main_writer_thread_proc(struct list_head *stream_list, - FILE *out_fp, - int out_ctype, - struct shared_queue *res_to_compress_queue, - struct shared_queue *compressed_res_queue, - size_t num_messages, - int write_flags, - wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress) +init_message(struct message *msg) { - int ret; - struct chunk_table *cur_chunk_tab = NULL; - struct message *msgs = CALLOC(num_messages, sizeof(struct message)); - struct wim_lookup_table_entry *next_lte = NULL; - - // Initially, all the messages are available to use. - LIST_HEAD(available_msgs); - - if (!msgs) { - ret = WIMLIB_ERR_NOMEM; - goto out; + for (size_t i = 0; i < MAX_CHUNKS_PER_MSG; i++) { + msg->compressed_chunks[i] = MALLOC(WIM_CHUNK_SIZE); + msg->uncompressed_chunks[i] = MALLOC(WIM_CHUNK_SIZE); + if (msg->compressed_chunks[i] == NULL || + msg->uncompressed_chunks[i] == NULL) + return WIMLIB_ERR_NOMEM; } + return 0; +} - for (size_t i = 0; i < num_messages; i++) - list_add(&msgs[i].list, &available_msgs); - - // outstanding_resources is the list of resources that currently have - // had chunks sent off for compression. - // - // The first stream in outstanding_resources is the stream that is - // currently being written (cur_lte). - // - // The last stream in outstanding_resources is the stream that is - // currently being read and chunks fed to the compressor threads - // (next_lte). - // - // Depending on the number of threads and the sizes of the resource, - // the outstanding streams list may contain streams between cur_lte and - // next_lte that have all their chunks compressed or being compressed, - // but haven't been written yet. - // - LIST_HEAD(outstanding_resources); - struct list_head *next_resource = stream_list->next; - u64 next_chunk = 0; - u64 next_num_chunks = 0; - - // As in write_wim_resource(), each resource we read is checksummed. - SHA_CTX next_sha_ctx; - u8 next_hash[SHA1_HASH_SIZE]; - - // Resources that don't need any chunks compressed are added to this - // list and written directly by the main thread. - LIST_HEAD(my_resources); - - struct wim_lookup_table_entry *cur_lte = NULL; - struct message *msg; +static void +destroy_message(struct message *msg) +{ + for (size_t i = 0; i < MAX_CHUNKS_PER_MSG; i++) { + FREE(msg->compressed_chunks[i]); + FREE(msg->uncompressed_chunks[i]); + } +} -#ifdef WITH_NTFS_3G - ntfs_inode *ni = NULL; -#endif +static void +free_messages(struct message *msgs, size_t num_messages) +{ + if (msgs) { + for (size_t i = 0; i < num_messages; i++) + destroy_message(&msgs[i]); + FREE(msgs); + } +} - DEBUG("Initializing buffers for uncompressed " - "and compressed data (%zu bytes needed)", - num_messages * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2); +static struct message * +allocate_messages(size_t num_messages) +{ + struct message *msgs; - // Pre-allocate all the buffers that will be needed to do the chunk - // compression. + msgs = CALLOC(num_messages, sizeof(struct message)); + if (!msgs) + return NULL; for (size_t i = 0; i < num_messages; i++) { - for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) { - msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE); - - // The extra 8 bytes is because longest_match() in - // lz77.c may read a little bit off the end of the - // uncompressed data. It doesn't need to be - // initialized--- we really just need to avoid accessing - // an unmapped page. - msgs[i].uncompressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE + 8); - if (msgs[i].compressed_chunks[j] == NULL || - msgs[i].uncompressed_chunks[j] == NULL) - { - ret = WIMLIB_ERR_NOMEM; - goto out; - } + if (init_message(&msgs[i])) { + free_messages(msgs, num_messages); + return NULL; } } + return msgs; +} - // This loop is executed until all resources have been written, except - // possibly a few that have been added to the @my_resources list for - // writing later. - while (1) { - // Send chunks to the compressor threads until either (a) there - // are no more messages available since they were all sent off, - // or (b) there are no more resources that need to be - // compressed. - while (!list_empty(&available_msgs)) { - if (next_chunk == next_num_chunks) { - // If next_chunk == next_num_chunks, there are - // no more chunks to write in the current - // stream. So, check the SHA1 message digest of - // the stream that was just finished (unless - // next_lte == NULL, which is the case the very - // first time this loop is entered, and also - // near the very end of the compression when - // there are no more streams.) Then, advance to - // the next stream (if there is one). - if (next_lte != NULL) { - #ifdef WITH_NTFS_3G - end_wim_resource_read(next_lte, ni); - ni = NULL; - #else - end_wim_resource_read(next_lte); - #endif - DEBUG2("Finalize SHA1 md (next_num_chunks=%zu)", - next_num_chunks); - sha1_final(next_hash, &next_sha_ctx); - if (!hashes_equal(next_lte->hash, next_hash)) { - ERROR("WIM resource has incorrect hash!"); - if (next_lte->resource_location == - RESOURCE_IN_FILE_ON_DISK) - { - ERROR("We were reading it from `%"TS"'; " - "maybe it changed while we were " - "reading it.", - next_lte->file_on_disk); - } - ret = WIMLIB_ERR_INVALID_RESOURCE_HASH; - goto out; - } - } +static void +main_writer_thread_destroy_ctx(struct main_writer_thread_ctx *ctx) +{ + free_messages(ctx->msgs, ctx->num_messages); + FREE(ctx->cur_chunk_tab); +} - // Advance to the next resource. - // - // If the next resource needs no compression, just write - // it with this thread (not now though--- we could be in - // the middle of writing another resource.) Keep doing - // this until we either get to the end of the resources - // list, or we get to a resource that needs compression. - while (1) { - if (next_resource == stream_list) { - // No more resources to send for - // compression - next_lte = NULL; - break; - } - next_lte = container_of(next_resource, - struct wim_lookup_table_entry, - write_streams_list); - next_resource = next_resource->next; - if ((!(write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS) - && wim_resource_compression_type(next_lte) == out_ctype) - || wim_resource_size(next_lte) == 0) - { - list_add_tail(&next_lte->write_streams_list, - &my_resources); - } else { - list_add_tail(&next_lte->write_streams_list, - &outstanding_resources); - next_chunk = 0; - next_num_chunks = wim_resource_chunks(next_lte); - sha1_init(&next_sha_ctx); - INIT_LIST_HEAD(&next_lte->msg_list); - #ifdef WITH_NTFS_3G - ret = prepare_resource_for_read(next_lte, &ni); - #else - ret = prepare_resource_for_read(next_lte); - #endif - - if (ret != 0) - goto out; - if (cur_lte == NULL) { - // Set cur_lte for the - // first time - cur_lte = next_lte; - } - break; - } - } - } - if (next_lte == NULL) { - // No more resources to send for compression - break; - } +static int +main_writer_thread_init_ctx(struct main_writer_thread_ctx *ctx) +{ + /* Pre-allocate all the buffers that will be needed to do the chunk + * compression. */ + ctx->msgs = allocate_messages(ctx->num_messages); + if (!ctx->msgs) + return WIMLIB_ERR_NOMEM; - // Get a message from the available messages - // list - msg = container_of(available_msgs.next, - struct message, - list); - - // ... and delete it from the available messages - // list - list_del(&msg->list); - - // Initialize the message with the chunks to - // compress. - msg->num_chunks = min(next_num_chunks - next_chunk, - MAX_CHUNKS_PER_MSG); - msg->lte = next_lte; - msg->complete = false; - msg->begin_chunk = next_chunk; - - unsigned size = WIM_CHUNK_SIZE; - for (unsigned i = 0; i < msg->num_chunks; i++) { - - // Read chunk @next_chunk of the stream into the - // message so that a compressor thread can - // compress it. - - if (next_chunk == next_num_chunks - 1) { - size = MODULO_NONZERO(wim_resource_size(next_lte), - WIM_CHUNK_SIZE); - } + /* Initially, all the messages are available to use. */ + INIT_LIST_HEAD(&ctx->available_msgs); + for (size_t i = 0; i < ctx->num_messages; i++) + list_add_tail(&ctx->msgs[i].list, &ctx->available_msgs); + + /* outstanding_streams is the list of streams that currently have had + * chunks sent off for compression. + * + * The first stream in outstanding_streams is the stream that is + * currently being written (cur_lte). + * + * The last stream in outstanding_streams is the stream that is + * currently being read and chunks fed to the compressor threads. */ + INIT_LIST_HEAD(&ctx->outstanding_streams); + + /* Resources that don't need any chunks compressed are added to this + * list and written directly by the main thread. */ + INIT_LIST_HEAD(&ctx->serial_streams); + + ctx->cur_lte = NULL; + return 0; +} - DEBUG2("Read resource (size=%u, offset=%zu)", - size, next_chunk * WIM_CHUNK_SIZE); +static int +receive_compressed_chunks(struct main_writer_thread_ctx *ctx) +{ + struct message *msg; + struct wim_lookup_table_entry *cur_lte; + int ret; - msg->uncompressed_chunk_sizes[i] = size; + wimlib_assert(!list_empty(&ctx->outstanding_streams)); + + /* Get the next message from the queue and process it. + * The message will contain 1 or more data chunks that have been + * compressed. */ + msg = shared_queue_get(ctx->compressed_res_queue); + msg->complete = true; + cur_lte = ctx->cur_lte; + + /* Is this the next chunk in the current resource? If it's not + * (i.e., an earlier chunk in a same or different resource + * hasn't been compressed yet), do nothing, and keep this + * message around until all earlier chunks are received. + * + * Otherwise, write all the chunks we can. */ + while (cur_lte != NULL && + !list_empty(&cur_lte->msg_list) && + (msg = container_of(cur_lte->msg_list.next, + struct message, + list))->complete) + { + if (msg->begin_chunk == 0) { - ret = read_wim_resource(next_lte, - msg->uncompressed_chunks[i], - size, - next_chunk * WIM_CHUNK_SIZE, - 0); - if (ret != 0) - goto out; - sha1_update(&next_sha_ctx, - msg->uncompressed_chunks[i], size); - next_chunk++; + /* This is the first set of chunks. Leave space + * for the chunk table in the output file. */ + off_t cur_offset = ftello(ctx->out_fp); + if (cur_offset == -1) { + ret = WIMLIB_ERR_WRITE; + goto out; } - - // Send the compression request - list_add_tail(&msg->list, &next_lte->msg_list); - shared_queue_put(res_to_compress_queue, msg); - DEBUG2("Compression request sent"); + ret = begin_wim_resource_chunk_tab(cur_lte, + ctx->out_fp, + cur_offset, + &ctx->cur_chunk_tab); + if (ret) + goto out; } - // If there are no outstanding resources, there are no more - // resources that need to be written. - if (list_empty(&outstanding_resources)) { - ret = 0; + /* Write the compressed chunks from the message. */ + ret = write_wim_chunks(msg, ctx->out_fp, ctx->cur_chunk_tab); + if (ret) goto out; - } - // Get the next message from the queue and process it. - // The message will contain 1 or more data chunks that have been - // compressed. - msg = shared_queue_get(compressed_res_queue); - msg->complete = true; - - // Is this the next chunk in the current resource? If it's not - // (i.e., an earlier chunk in a same or different resource - // hasn't been compressed yet), do nothing, and keep this - // message around until all earlier chunks are received. - // - // Otherwise, write all the chunks we can. - while (cur_lte != NULL && - !list_empty(&cur_lte->msg_list) && - (msg = container_of(cur_lte->msg_list.next, - struct message, - list))->complete) - { - DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk); - if (msg->begin_chunk == 0) { - DEBUG2("Begin chunk tab"); - - // This is the first set of chunks. Leave space - // for the chunk table in the output file. - off_t cur_offset = ftello(out_fp); - if (cur_offset == -1) { - ret = WIMLIB_ERR_WRITE; - goto out; - } - ret = begin_wim_resource_chunk_tab(cur_lte, - out_fp, - cur_offset, - &cur_chunk_tab); - if (ret != 0) - goto out; - } + list_del(&msg->list); - // Write the compressed chunks from the message. - ret = write_wim_chunks(msg, out_fp, cur_chunk_tab); - if (ret != 0) - goto out; + /* This message is available to use for different chunks + * now. */ + list_add(&msg->list, &ctx->available_msgs); - list_del(&msg->list); - - // This message is available to use for different chunks - // now. - list_add(&msg->list, &available_msgs); + /* Was this the last chunk of the stream? If so, finish + * it. */ + if (list_empty(&cur_lte->msg_list) && + msg->begin_chunk + msg->num_chunks == ctx->cur_chunk_tab->num_chunks) + { + DEBUG2("Finish wim chunk tab"); + u64 res_csize; + ret = finish_wim_resource_chunk_tab(ctx->cur_chunk_tab, + ctx->out_fp, + &res_csize); + if (ret) + goto out; - // Was this the last chunk of the stream? If so, finish - // it. - if (list_empty(&cur_lte->msg_list) && - msg->begin_chunk + msg->num_chunks == cur_chunk_tab->num_chunks) - { - DEBUG2("Finish wim chunk tab"); - u64 res_csize; - ret = finish_wim_resource_chunk_tab(cur_chunk_tab, - out_fp, - &res_csize); - if (ret != 0) +#if 0 + if (res_csize >= wim_resource_size(cur_lte)) { + /* Oops! We compressed the resource to + * larger than the original size. Write + * the resource uncompressed instead. */ + ret = write_uncompressed_resource_and_truncate( + cur_lte, + ctx->out_fp, + ctx->cur_chunk_tab->file_offset, + &cur_lte->output_resource_entry); + if (ret) goto out; + } else +#endif + { + cur_lte->output_resource_entry.size = + res_csize; - if (res_csize >= wim_resource_size(cur_lte)) { - /* Oops! We compressed the resource to - * larger than the original size. Write - * the resource uncompressed instead. */ - ret = write_uncompressed_resource_and_truncate( - cur_lte, - out_fp, - cur_chunk_tab->file_offset, - &cur_lte->output_resource_entry); - if (ret != 0) - goto out; - } else { - cur_lte->output_resource_entry.size = - res_csize; - - cur_lte->output_resource_entry.original_size = - cur_lte->resource_entry.original_size; - - cur_lte->output_resource_entry.offset = - cur_chunk_tab->file_offset; - - cur_lte->output_resource_entry.flags = - cur_lte->resource_entry.flags | - WIM_RESHDR_FLAG_COMPRESSED; - } + cur_lte->output_resource_entry.original_size = + cur_lte->resource_entry.original_size; - do_write_streams_progress(progress, progress_func, - wim_resource_size(cur_lte)); - - FREE(cur_chunk_tab); - cur_chunk_tab = NULL; - - struct list_head *next = cur_lte->write_streams_list.next; - list_del(&cur_lte->write_streams_list); - - if (next == &outstanding_resources) - cur_lte = NULL; - else - cur_lte = container_of(cur_lte->write_streams_list.next, - struct wim_lookup_table_entry, - write_streams_list); - - // Since we just finished writing a stream, - // write any streams that have been added to the - // my_resources list for direct writing by the - // main thread (e.g. resources that don't need - // to be compressed because the desired - // compression type is the same as the previous - // compression type). - ret = do_write_stream_list(&my_resources, - out_fp, - out_ctype, - progress_func, - progress, - 0); - if (ret != 0) - goto out; + cur_lte->output_resource_entry.offset = + ctx->cur_chunk_tab->file_offset; + + cur_lte->output_resource_entry.flags = + cur_lte->resource_entry.flags | + WIM_RESHDR_FLAG_COMPRESSED; } + do_write_streams_progress(ctx->progress, ctx->progress_func, + wim_resource_size(cur_lte)); + FREE(ctx->cur_chunk_tab); + ctx->cur_chunk_tab = NULL; + + struct list_head *next = cur_lte->write_streams_list.next; + list_del(&cur_lte->write_streams_list); + + if (next == &ctx->outstanding_streams) + cur_lte = NULL; + else + cur_lte = container_of(cur_lte->write_streams_list.next, + struct wim_lookup_table_entry, + write_streams_list); + + /* Since we just finished writing a stream, write any + * streams that have been added to the serial_streams + * list for direct writing by the main thread (e.g. + * resources that don't need to be compressed because + * the desired compression type is the same as the + * previous compression type). */ + ret = do_write_stream_list(&ctx->serial_streams, + ctx->lookup_table, + ctx->out_fp, + ctx->out_ctype, + ctx->progress_func, + ctx->progress, + 0); + if (ret) + goto out; } } - out: - if (ret == WIMLIB_ERR_NOMEM) { - ERROR("Could not allocate enough memory for " - "multi-threaded compression"); + ctx->cur_lte = cur_lte; + return ret; +} + +static int +main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx) +{ + struct main_writer_thread_ctx *ctx = _ctx; + int ret; + struct message *next_msg; + + next_msg = ctx->next_msg; + + sha1_update(&ctx->sha_ctx, chunk, chunk_size); + + if (!next_msg) { + if (list_empty(&ctx->available_msgs)) { + ret = receive_compressed_chunks(ctx); + if (ret) + return ret; + } + + wimlib_assert(!list_empty(&ctx->available_msgs)); + + next_msg = container_of(ctx->available_msgs.next, + struct message, + list); + list_del(&next_msg->list); + next_msg->complete = false; + next_msg->begin_chunk = ctx->next_chunk; + next_msg->num_chunks = min(MAX_CHUNKS_PER_MSG, + ctx->next_num_chunks - ctx->next_chunk); + ctx->next_chunk_in_msg = 0; } - if (next_lte) { -#ifdef WITH_NTFS_3G - end_wim_resource_read(next_lte, ni); -#else - end_wim_resource_read(next_lte); -#endif + wimlib_assert(next_msg != NULL); + wimlib_assert(ctx->next_chunk_in_msg < next_msg->num_chunks); + + next_msg->uncompressed_chunk_sizes[ctx->next_chunk_in_msg] = chunk_size; + memcpy(next_msg->uncompressed_chunks[ctx->next_chunk_in_msg], + chunk, chunk_size); + + if (++ctx->next_chunk_in_msg == next_msg->num_chunks) { + shared_queue_put(ctx->res_to_compress_queue, + next_msg); + ctx->next_msg = NULL; } + return 0; +} - if (ret == 0) { - ret = do_write_stream_list(&my_resources, out_fp, - out_ctype, progress_func, - progress, 0); - } else { - if (msgs) { - size_t num_available_msgs = 0; - struct list_head *cur; +static int +submit_stream_for_compression(struct wim_lookup_table_entry *lte, + struct main_writer_thread_ctx *ctx) +{ + int ret; - list_for_each(cur, &available_msgs) { - num_available_msgs++; - } + sha1_init(&ctx->sha_ctx); + ctx->next_num_chunks = wim_resource_chunks(lte); + ret = read_resource_prefix(lte, wim_resource_size(lte), + main_writer_thread_cb, ctx, 0); + if (ret) + return ret; + ret = finalize_and_check_sha1(&ctx->sha_ctx, lte); + if (ret) + return ret; +} + +/* + * This function is executed by the main thread when the resources are being + * compressed in parallel. The main thread is in change of all reading of the + * uncompressed data and writing of the compressed data. The compressor threads + * *only* do compression from/to in-memory buffers. + * + * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG + * chunks of compressed data to compress, represented in a `struct message'. + * Each message is passed from the main thread to a worker thread through the + * res_to_compress_queue, and it is passed back through the + * compressed_res_queue. + */ +static int +main_writer_thread_proc(struct main_writer_thread_ctx *ctx) +{ + int ret; + struct list_head *stream_list; + struct wim_lookup_table_entry *lte; - while (num_available_msgs < num_messages) { - shared_queue_get(compressed_res_queue); - num_available_msgs++; + ret = main_writer_thread_init_ctx(ctx); + if (ret) + goto out_destroy_ctx; + + stream_list = ctx->stream_list; + while (!list_empty(stream_list)) { + lte = container_of(stream_list->next, + struct wim_lookup_table_entry, + write_streams_list); + list_del(<e->write_streams_list); + if (lte->unhashed && !lte->unique_size) { + struct wim_lookup_table_entry *tmp; + u32 orig_refcnt = lte->out_refcnt; + + ret = hash_unhashed_stream(lte, ctx->lookup_table, &tmp); + if (ret) + goto out_destroy_ctx; + if (tmp != lte) { + lte = tmp; + if (orig_refcnt != tmp->out_refcnt) { + DEBUG("Discarding duplicate stream of length %"PRIu64, + wim_resource_size(lte)); + goto skip_to_progress; + } } } - } - if (msgs) { - for (size_t i = 0; i < num_messages; i++) { - for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) { - FREE(msgs[i].compressed_chunks[j]); - FREE(msgs[i].uncompressed_chunks[j]); + if (wim_resource_size(lte) < 1000 || + ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE || + (lte->resource_location == RESOURCE_IN_WIM && + wimlib_get_compression_type(lte->wim) == ctx->out_ctype)) + { + list_add(<e->write_streams_list, + &ctx->serial_streams); + } else { + ret = submit_stream_for_compression(lte, ctx); + if (ret) + goto out_destroy_ctx; + if (lte->unhashed) { + list_del(<e->unhashed_list); + lookup_table_insert(ctx->lookup_table, lte); + lte->unhashed = 0; } } - FREE(msgs); + skip_to_progress: + do_write_streams_progress(ctx->progress, + ctx->progress_func, + wim_resource_size(lte)); } - FREE(cur_chunk_tab); + while (!list_empty(&ctx->outstanding_streams)) { + ret = receive_compressed_chunks(ctx); + if (ret) + goto out_destroy_ctx; + } + ret = 0; +out_destroy_ctx: + main_writer_thread_destroy_ctx(ctx); return ret; } @@ -1194,7 +1164,6 @@ write_stream_list_parallel(struct list_head *stream_list, } progress->write_streams.num_threads = num_threads; - wimlib_assert(stream_list->next != stream_list); static const double MESSAGES_PER_THREAD = 2.0; size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD); @@ -1236,15 +1205,19 @@ write_stream_list_parallel(struct list_head *stream_list, if (progress_func) progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress); - ret = main_writer_thread_proc(stream_list, - out_fp, - out_ctype, - &res_to_compress_queue, - &compressed_res_queue, - queue_size, - write_flags, - progress_func, - progress); + struct main_writer_thread_ctx ctx; + memset(&ctx, 0, sizeof(ctx)); + ctx.stream_list = stream_list; + ctx.lookup_table = lookup_table; + ctx.out_fp = out_fp; + ctx.out_ctype = out_ctype; + ctx.res_to_compress_queue = &res_to_compress_queue; + ctx.compressed_res_queue = &compressed_res_queue; + ctx.num_messages = queue_size; + ctx.write_flags = write_flags; + ctx.progress_func = progress_func; + ctx.progress = progress; + ret = main_writer_thread_proc(&ctx); out_join: for (unsigned i = 0; i < num_threads; i++) shared_queue_put(&res_to_compress_queue, NULL); -- 2.43.0