+ goto out_destroy_res_to_compress_queue;
+
+ struct compressor_thread_params params;
+ params.res_to_compress_queue = &res_to_compress_queue;
+ params.compressed_res_queue = &compressed_res_queue;
+ params.compress = get_compress_func(out_ctype);
+
+ compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+ if (!compressor_threads) {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out_destroy_compressed_res_queue;
+ }
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ DEBUG("pthread_create thread %u", i);
+ ret = pthread_create(&compressor_threads[i], NULL,
+ compressor_thread_proc, ¶ms);
+ if (ret != 0) {
+ ret = -1;
+ ERROR_WITH_ERRNO("Failed to create compressor "
+ "thread %u", i);
+ num_threads = i;
+ goto out_join;
+ }
+ }
+
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+
+ struct main_writer_thread_ctx ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ ctx.stream_list = stream_list;
+ ctx.lookup_table = lookup_table;
+ ctx.out_fp = out_fp;
+ ctx.out_ctype = out_ctype;
+ ctx.res_to_compress_queue = &res_to_compress_queue;
+ ctx.compressed_res_queue = &compressed_res_queue;
+ ctx.num_messages = queue_size;
+ ctx.write_flags = write_flags;
+ ctx.progress_func = progress_func;
+ ctx.progress = progress;
+ ret = main_writer_thread_proc(&ctx);
+out_join:
+ for (unsigned i = 0; i < num_threads; i++)
+ shared_queue_put(&res_to_compress_queue, NULL);
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ if (pthread_join(compressor_threads[i], NULL)) {
+ WARNING_WITH_ERRNO("Failed to join compressor "
+ "thread %u", i);
+ }
+ }
+ FREE(compressor_threads);
+out_destroy_compressed_res_queue:
+ shared_queue_destroy(&compressed_res_queue);
+out_destroy_res_to_compress_queue:
+ shared_queue_destroy(&res_to_compress_queue);
+ if (ret >= 0 && ret != WIMLIB_ERR_NOMEM)
+ return ret;
+out_serial:
+ WARNING("Falling back to single-threaded compression");
+ return write_stream_list_serial(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ progress);
+
+}
+#endif
+
+/*
+ * Write a list of streams to a WIM (@out_fp) using the compression type
+ * @out_ctype and up to @num_threads compressor threads.
+ */
+static int
+write_stream_list(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ FILE *out_fp, int out_ctype, int write_flags,
+ unsigned num_threads, wimlib_progress_func_t progress_func)
+{
+ struct wim_lookup_table_entry *lte;
+ size_t num_streams = 0;
+ u64 total_bytes = 0;
+ u64 total_compression_bytes = 0;
+ union wimlib_progress_info progress;
+ int ret;
+
+ if (list_empty(stream_list))
+ return 0;
+
+ /* Calculate the total size of the streams to be written. Note: this
+ * will be the uncompressed size, as we may not know the compressed size
+ * yet, and also this will assume that every unhashed stream will be
+ * written (which will not necessarily be the case). */
+ list_for_each_entry(lte, stream_list, write_streams_list) {
+ num_streams++;
+ total_bytes += wim_resource_size(lte);
+ if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE
+ && (wim_resource_compression_type(lte) != out_ctype ||
+ (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)))
+ {
+ total_compression_bytes += wim_resource_size(lte);
+ }
+ }
+ progress.write_streams.total_bytes = total_bytes;
+ progress.write_streams.total_streams = num_streams;
+ progress.write_streams.completed_bytes = 0;
+ progress.write_streams.completed_streams = 0;
+ progress.write_streams.num_threads = num_threads;
+ progress.write_streams.compression_type = out_ctype;
+ progress.write_streams._private = 0;
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+ if (total_compression_bytes >= 1000000 && num_threads != 1)
+ ret = write_stream_list_parallel(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ num_threads,
+ progress_func,
+ &progress);
+ else
+#endif
+ ret = write_stream_list_serial(stream_list,
+ lookup_table,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ &progress);
+ return ret;
+}
+
+struct stream_size_table {
+ struct hlist_head *array;
+ size_t num_entries;
+ size_t capacity;
+};
+
+static int
+init_stream_size_table(struct stream_size_table *tab, size_t capacity)
+{
+ tab->array = CALLOC(capacity, sizeof(tab->array[0]));
+ if (!tab->array)
+ return WIMLIB_ERR_NOMEM;
+ tab->num_entries = 0;
+ tab->capacity = capacity;
+ return 0;
+}
+
+static void
+destroy_stream_size_table(struct stream_size_table *tab)
+{
+ FREE(tab->array);
+}
+
+static int
+stream_size_table_insert(struct wim_lookup_table_entry *lte, void *_tab)
+{
+ struct stream_size_table *tab = _tab;
+ size_t pos;
+ struct wim_lookup_table_entry *hashed_lte;
+ struct hlist_node *tmp;
+
+ pos = hash_u64(wim_resource_size(lte)) % tab->capacity;
+ lte->unique_size = 1;
+ hlist_for_each_entry(hashed_lte, tmp, &tab->array[pos], hash_list_2) {
+ if (wim_resource_size(hashed_lte) == wim_resource_size(lte)) {
+ lte->unique_size = 0;
+ hashed_lte->unique_size = 0;
+ break;
+ }
+ }
+
+ hlist_add_head(<e->hash_list_2, &tab->array[pos]);
+ tab->num_entries++;
+ return 0;
+}
+
+
+struct lte_overwrite_prepare_args {
+ WIMStruct *wim;
+ off_t end_offset;
+ struct list_head stream_list;
+ struct stream_size_table stream_size_tab;
+};
+
+static int
+lte_overwrite_prepare(struct wim_lookup_table_entry *lte, void *arg)
+{
+ struct lte_overwrite_prepare_args *args = arg;
+
+ if (lte->resource_location == RESOURCE_IN_WIM &&
+ lte->wim == args->wim)
+ {
+ /* We can't do an in place overwrite on the WIM if there are
+ * streams after the XML data. */
+ if (lte->resource_entry.offset +
+ lte->resource_entry.size > args->end_offset)
+ {
+ #ifdef ENABLE_ERROR_MESSAGES
+ ERROR("The following resource is after the XML data:");
+ print_lookup_table_entry(lte, stderr);
+ #endif
+ return WIMLIB_ERR_RESOURCE_ORDER;
+ }
+ } else {
+ wimlib_assert(!(lte->resource_entry.flags & WIM_RESHDR_FLAG_METADATA));
+ list_add_tail(<e->write_streams_list, &args->stream_list);
+ }
+ lte->out_refcnt = lte->refcnt;
+ stream_size_table_insert(lte, &args->stream_size_tab);
+ return 0;
+}
+
+static int
+lte_set_output_res_entry(struct wim_lookup_table_entry *lte, void *_wim)
+{
+ if (lte->resource_location == RESOURCE_IN_WIM && lte->wim == _wim) {
+ copy_resource_entry(<e->output_resource_entry,
+ <e->resource_entry);
+ }
+ return 0;
+}
+
+/* Given a WIM that we are going to overwrite in place with zero or more
+ * additional streams added, construct a list the list of new unique streams
+ * ('struct wim_lookup_table_entry's) that must be written, plus any unhashed
+ * streams that need to be added but may be identical to other hashed or
+ * unhashed streams. These unhashed streams are checksummed while the streams
+ * are being written. To aid this process, the member @unique_size is set to 1
+ * on streams that have a unique size and therefore must be written.
+ *
+ * The out_refcnt member of each 'struct wim_lookup_table_entry' is set to
+ * indicate the number of times the stream is referenced in only the streams
+ * that are being written; this may still be adjusted later when unhashed
+ * streams are being resolved.
+ */
+static int
+prepare_streams_for_overwrite(WIMStruct *wim, off_t end_offset,
+ struct list_head *stream_list)
+{
+ int ret;
+ struct lte_overwrite_prepare_args args;
+
+ args.wim = wim;
+ args.end_offset = end_offset;
+ ret = init_stream_size_table(&args.stream_size_tab,
+ wim->lookup_table->capacity);
+ if (ret)
+ return ret;
+
+ INIT_LIST_HEAD(&args.stream_list);
+ for (int i = 0; i < wim->hdr.image_count; i++) {
+ struct wim_image_metadata *imd;
+ struct wim_lookup_table_entry *lte;
+
+ imd = wim->image_metadata[i];
+ image_for_each_unhashed_stream(lte, imd) {
+ ret = lte_overwrite_prepare(lte, &args);
+ if (ret)
+ goto out_destroy_stream_size_table;
+ }
+ }
+ ret = for_lookup_table_entry(wim->lookup_table,
+ lte_overwrite_prepare, &args);
+ if (ret)
+ goto out_destroy_stream_size_table;
+
+ for (int i = 0; i < wim->hdr.image_count; i++)
+ lte_set_output_res_entry(wim->image_metadata[i]->metadata_lte,
+ wim);
+ for_lookup_table_entry(wim->lookup_table, lte_set_output_res_entry, wim);
+ list_transfer(&args.stream_list, stream_list);
+out_destroy_stream_size_table:
+ destroy_stream_size_table(&args.stream_size_tab);
+ return ret;
+}
+
+
+struct find_streams_ctx {
+ struct list_head stream_list;
+ struct stream_size_table stream_size_tab;
+};
+
+static void
+inode_find_streams_to_write(struct wim_inode *inode,
+ struct wim_lookup_table *table,
+ struct list_head *stream_list,
+ struct stream_size_table *tab)
+{
+ struct wim_lookup_table_entry *lte;
+ for (unsigned i = 0; i <= inode->i_num_ads; i++) {
+ lte = inode_stream_lte(inode, i, table);
+ if (lte) {
+ if (lte->out_refcnt == 0) {
+ if (lte->unhashed)
+ stream_size_table_insert(lte, tab);
+ list_add_tail(<e->write_streams_list, stream_list);
+ }
+ lte->out_refcnt += inode->i_nlink;
+ }
+ }
+}
+
+static int
+image_find_streams_to_write(WIMStruct *w)
+{
+ struct find_streams_ctx *ctx;
+ struct wim_image_metadata *imd;
+ struct wim_inode *inode;
+ struct wim_lookup_table_entry *lte;
+
+ ctx = w->private;
+ imd = wim_get_current_image_metadata(w);
+
+ image_for_each_unhashed_stream(lte, imd)
+ lte->out_refcnt = 0;
+
+ /* Go through this image's inodes to find any streams that have not been
+ * found yet. */
+ image_for_each_inode(inode, imd) {
+ inode_find_streams_to_write(inode, w->lookup_table,
+ &ctx->stream_list,
+ &ctx->stream_size_tab);
+ }
+ return 0;
+}
+
+/* Given a WIM that from which one or all of the images is being written, build
+ * the list of unique streams ('struct wim_lookup_table_entry's) that must be
+ * written, plus any unhashed streams that need to be written but may be
+ * identical to other hashed or unhashed streams being written. These unhashed
+ * streams are checksummed while the streams are being written. To aid this
+ * process, the member @unique_size is set to 1 on streams that have a unique
+ * size and therefore must be written.
+ *
+ * The out_refcnt member of each 'struct wim_lookup_table_entry' is set to
+ * indicate the number of times the stream is referenced in only the streams
+ * that are being written; this may still be adjusted later when unhashed
+ * streams are being resolved.
+ */
+static int
+prepare_stream_list(WIMStruct *wim, int image, struct list_head *stream_list)
+{
+ int ret;
+ struct find_streams_ctx ctx;
+
+ for_lookup_table_entry(wim->lookup_table, lte_zero_out_refcnt, NULL);
+ ret = init_stream_size_table(&ctx.stream_size_tab,
+ wim->lookup_table->capacity);
+ if (ret)