+ next_msg = ctx->next_msg;
+
+ sha1_update(&ctx->sha_ctx, chunk, chunk_size);
+
+ if (!next_msg) {
+ if (list_empty(&ctx->available_msgs)) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+
+ wimlib_assert(!list_empty(&ctx->available_msgs));
+
+ next_msg = container_of(ctx->available_msgs.next,
+ struct message,
+ list);
+ list_del(&next_msg->list);
+ next_msg->complete = false;
+ next_msg->begin_chunk = ctx->next_chunk;
+ next_msg->num_chunks = min(MAX_CHUNKS_PER_MSG,
+ ctx->next_num_chunks - ctx->next_chunk);
+ ctx->next_chunk_in_msg = 0;
+ }
+
+ wimlib_assert(next_msg != NULL);
+ wimlib_assert(ctx->next_chunk_in_msg < next_msg->num_chunks);
+
+ next_msg->uncompressed_chunk_sizes[ctx->next_chunk_in_msg] = chunk_size;
+ memcpy(next_msg->uncompressed_chunks[ctx->next_chunk_in_msg],
+ chunk, chunk_size);
+
+ if (++ctx->next_chunk_in_msg == next_msg->num_chunks) {
+ shared_queue_put(ctx->res_to_compress_queue,
+ next_msg);
+ ctx->next_msg = NULL;
+ }
+ return 0;
+}
+
+static int
+submit_stream_for_compression(struct wim_lookup_table_entry *lte,
+ struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+
+ sha1_init(&ctx->sha_ctx);
+ ctx->next_num_chunks = wim_resource_chunks(lte);
+ ret = read_resource_prefix(lte, wim_resource_size(lte),
+ main_writer_thread_cb, ctx, 0);
+ if (ret)
+ return ret;
+ ret = finalize_and_check_sha1(&ctx->sha_ctx, lte);
+ if (ret)
+ return ret;
+}
+
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel. The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data. The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
+static int
+main_writer_thread_proc(struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+ struct list_head *stream_list;
+ struct wim_lookup_table_entry *lte;
+
+ ret = main_writer_thread_init_ctx(ctx);
+ if (ret)
+ goto out_destroy_ctx;
+
+ stream_list = ctx->stream_list;
+ while (!list_empty(stream_list)) {
+ lte = container_of(stream_list->next,
+ struct wim_lookup_table_entry,
+ write_streams_list);
+ list_del(<e->write_streams_list);
+ if (lte->unhashed && !lte->unique_size) {
+ struct wim_lookup_table_entry *tmp;
+ u32 orig_refcnt = lte->out_refcnt;
+
+ ret = hash_unhashed_stream(lte, ctx->lookup_table, &tmp);
+ if (ret)
+ goto out_destroy_ctx;
+ if (tmp != lte) {
+ lte = tmp;
+ if (orig_refcnt != tmp->out_refcnt) {
+ DEBUG("Discarding duplicate stream of length %"PRIu64,
+ wim_resource_size(lte));
+ goto skip_to_progress;
+ }
+ }
+ }
+
+ if (wim_resource_size(lte) < 1000 ||
+ ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE ||
+ (lte->resource_location == RESOURCE_IN_WIM &&
+ wimlib_get_compression_type(lte->wim) == ctx->out_ctype))
+ {
+ list_add(<e->write_streams_list,
+ &ctx->serial_streams);
+ } else {
+ ret = submit_stream_for_compression(lte, ctx);
+ if (ret)
+ goto out_destroy_ctx;
+ if (lte->unhashed) {
+ list_del(<e->unhashed_list);
+ lookup_table_insert(ctx->lookup_table, lte);
+ lte->unhashed = 0;
+ }
+ }
+ skip_to_progress:
+ do_write_streams_progress(ctx->progress,
+ ctx->progress_func,
+ wim_resource_size(lte));
+ }
+
+ while (!list_empty(&ctx->outstanding_streams)) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ goto out_destroy_ctx;
+ }
+ ret = 0;
+out_destroy_ctx:
+ main_writer_thread_destroy_ctx(ctx);
+ return ret;
+}
+
+static long
+get_default_num_threads()
+{
+#ifdef __WIN32__
+ return win32_get_number_of_processors();
+#else
+ return sysconf(_SC_NPROCESSORS_ONLN);
+#endif
+}
+
+static int
+write_stream_list_parallel(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ FILE *out_fp,
+ int out_ctype,
+ int write_flags,
+ unsigned num_threads,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int ret;
+ struct shared_queue res_to_compress_queue;
+ struct shared_queue compressed_res_queue;
+ pthread_t *compressor_threads = NULL;
+
+ if (num_threads == 0) {
+ long nthreads = get_default_num_threads();
+ if (nthreads < 1 || nthreads > UINT_MAX) {
+ WARNING("Could not determine number of processors! Assuming 1");
+ goto out_serial;
+ } else {
+ num_threads = nthreads;
+ }
+ }
+
+ progress->write_streams.num_threads = num_threads;
+
+ static const double MESSAGES_PER_THREAD = 2.0;
+ size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD);
+
+ DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
+
+ ret = shared_queue_init(&res_to_compress_queue, queue_size);
+ if (ret != 0)
+ goto out_serial;
+
+ ret = shared_queue_init(&compressed_res_queue, queue_size);
+ if (ret != 0)
+ goto out_destroy_res_to_compress_queue;
+
+ struct compressor_thread_params params;
+ params.res_to_compress_queue = &res_to_compress_queue;
+ params.compressed_res_queue = &compressed_res_queue;
+ params.compress = get_compress_func(out_ctype);
+
+ compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+ if (!compressor_threads) {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out_destroy_compressed_res_queue;
+ }
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ DEBUG("pthread_create thread %u", i);
+ ret = pthread_create(&compressor_threads[i], NULL,
+ compressor_thread_proc, ¶ms);
+ if (ret != 0) {
+ ret = -1;
+ ERROR_WITH_ERRNO("Failed to create compressor "
+ "thread %u", i);
+ num_threads = i;
+ goto out_join;
+ }
+ }
+
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+
+ struct main_writer_thread_ctx ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ ctx.stream_list = stream_list;
+ ctx.lookup_table = lookup_table;
+ ctx.out_fp = out_fp;
+ ctx.out_ctype = out_ctype;
+ ctx.res_to_compress_queue = &res_to_compress_queue;
+ ctx.compressed_res_queue = &compressed_res_queue;
+ ctx.num_messages = queue_size;
+ ctx.write_flags = write_flags;
+ ctx.progress_func = progress_func;
+ ctx.progress = progress;
+ ret = main_writer_thread_proc(&ctx);
+out_join:
+ for (unsigned i = 0; i < num_threads; i++)
+ shared_queue_put(&res_to_compress_queue, NULL);
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ if (pthread_join(compressor_threads[i], NULL)) {
+ WARNING_WITH_ERRNO("Failed to join compressor "
+ "thread %u", i);
+ }
+ }
+ FREE(compressor_threads);
+out_destroy_compressed_res_queue:
+ shared_queue_destroy(&compressed_res_queue);
+out_destroy_res_to_compress_queue:
+ shared_queue_destroy(&res_to_compress_queue);
+ if (ret >= 0 && ret != WIMLIB_ERR_NOMEM)
+ return ret;
+out_serial:
+ WARNING("Falling back to single-threaded compression");
+ return write_stream_list_serial(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ progress);
+
+}
+#endif
+
+/*
+ * Write a list of streams to a WIM (@out_fp) using the compression type
+ * @out_ctype and up to @num_threads compressor threads.
+ */
+static int
+write_stream_list(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ FILE *out_fp, int out_ctype, int write_flags,
+ unsigned num_threads, wimlib_progress_func_t progress_func)
+{
+ struct wim_lookup_table_entry *lte;
+ size_t num_streams = 0;
+ u64 total_bytes = 0;
+ u64 total_compression_bytes = 0;
+ union wimlib_progress_info progress;
+ int ret;
+
+ if (list_empty(stream_list))
+ return 0;
+
+ list_for_each_entry(lte, stream_list, write_streams_list) {
+ num_streams++;
+ total_bytes += wim_resource_size(lte);
+ if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE
+ && (wim_resource_compression_type(lte) != out_ctype ||
+ (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)))
+ {
+ total_compression_bytes += wim_resource_size(lte);
+ }
+ }
+ progress.write_streams.total_bytes = total_bytes;
+ progress.write_streams.total_streams = num_streams;
+ progress.write_streams.completed_bytes = 0;
+ progress.write_streams.completed_streams = 0;
+ progress.write_streams.num_threads = num_threads;
+ progress.write_streams.compression_type = out_ctype;
+ progress.write_streams._private = 0;
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+ if (total_compression_bytes >= 1000000 && num_threads != 1)
+ ret = write_stream_list_parallel(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ num_threads,
+ progress_func,
+ &progress);
+ else
+#endif
+ ret = write_stream_list_serial(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ &progress);
+ return ret;
+}
+
+struct stream_size_table {
+ struct hlist_head *array;
+ size_t num_entries;
+ size_t capacity;
+};
+
+static int
+init_stream_size_table(struct stream_size_table *tab, size_t capacity)
+{
+ tab->array = CALLOC(capacity, sizeof(tab->array[0]));
+ if (!tab->array)
+ return WIMLIB_ERR_NOMEM;
+ tab->num_entries = 0;
+ tab->capacity = capacity;
+ return 0;
+}
+
+static void
+destroy_stream_size_table(struct stream_size_table *tab)
+{
+ FREE(tab->array);
+}
+
+static int
+stream_size_table_insert(struct wim_lookup_table_entry *lte, void *_tab)
+{
+ struct stream_size_table *tab = _tab;
+ size_t pos;
+ struct wim_lookup_table_entry *hashed_lte;
+ struct hlist_node *tmp;
+
+ pos = hash_u64(wim_resource_size(lte)) % tab->capacity;
+ lte->unique_size = 1;
+ hlist_for_each_entry(hashed_lte, tmp, &tab->array[pos], hash_list_2) {
+ if (wim_resource_size(hashed_lte) == wim_resource_size(lte)) {
+ lte->unique_size = 0;
+ hashed_lte->unique_size = 0;
+ break;
+ }
+ }
+
+ hlist_add_head(<e->hash_list_2, &tab->array[pos]);
+ tab->num_entries++;
+ return 0;
+}
+
+
+struct lte_overwrite_prepare_args {
+ WIMStruct *wim;
+ off_t end_offset;
+ struct list_head stream_list;
+ struct stream_size_table stream_size_tab;
+};
+
+static int
+lte_overwrite_prepare(struct wim_lookup_table_entry *lte, void *arg)
+{
+ struct lte_overwrite_prepare_args *args = arg;
+
+ if (lte->resource_location == RESOURCE_IN_WIM &&
+ lte->wim == args->wim)
+ {
+ /* We can't do an in place overwrite on the WIM if there are
+ * streams after the XML data. */
+ if (lte->resource_entry.offset +
+ lte->resource_entry.size > args->end_offset)
+ {
+ #ifdef ENABLE_ERROR_MESSAGES
+ ERROR("The following resource is after the XML data:");
+ print_lookup_table_entry(lte, stderr);
+ #endif
+ return WIMLIB_ERR_RESOURCE_ORDER;
+ }
+ } else {
+ wimlib_assert(!(lte->resource_entry.flags & WIM_RESHDR_FLAG_METADATA));
+ list_add_tail(<e->write_streams_list, &args->stream_list);