+static int
+main_writer_thread_finish(void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+ while (ctx->num_outstanding_messages != 0) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+ wimlib_assert(list_empty(&ctx->outstanding_streams));
+ return do_write_stream_list_serial(&ctx->serial_streams,
+ ctx->lookup_table,
+ ctx->out_fd,
+ ctx->out_ctype,
+ ctx->write_resource_flags,
+ ctx->progress_func,
+ ctx->progress);
+}
+
+static int
+submit_stream_for_compression(struct wim_lookup_table_entry *lte,
+ struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+
+ /* Read the entire stream @lte, feeding its data chunks to the
+ * compressor threads. Also SHA1-sum the stream; this is required in
+ * the case that @lte is unhashed, and a nice additional verification
+ * when @lte is already hashed. */
+ sha1_init(&ctx->next_sha_ctx);
+ ctx->next_chunk = 0;
+ ctx->next_num_chunks = wim_resource_chunks(lte);
+ ctx->next_lte = lte;
+ INIT_LIST_HEAD(<e->msg_list);
+ list_add_tail(<e->being_compressed_list, &ctx->outstanding_streams);
+ ret = read_resource_prefix(lte, wim_resource_size(lte),
+ main_writer_thread_cb, ctx, 0);
+ if (ret == 0) {
+ wimlib_assert(ctx->next_chunk == ctx->next_num_chunks);
+ ret = finalize_and_check_sha1(&ctx->next_sha_ctx, lte);
+ }
+ return ret;
+}
+
+static int
+main_thread_process_next_stream(struct wim_lookup_table_entry *lte, void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+
+ if (wim_resource_size(lte) < 1000 ||
+ ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE ||
+ (lte->resource_location == RESOURCE_IN_WIM &&
+ !(ctx->write_resource_flags & WIMLIB_RESOURCE_FLAG_RECOMPRESS) &&
+ wimlib_get_compression_type(lte->wim) == ctx->out_ctype))
+ {
+ /* Stream is too small or isn't being compressed. Process it by
+ * the main thread when we have a chance. We can't necessarily
+ * process it right here, as the main thread could be in the
+ * middle of writing a different stream. */
+ list_add_tail(<e->write_streams_list, &ctx->serial_streams);
+ lte->deferred = 1;
+ ret = 0;
+ } else {
+ ret = submit_stream_for_compression(lte, ctx);
+ }
+ lte->no_progress = 1;
+ return ret;
+}
+
+static long
+get_default_num_threads()
+{
+#ifdef __WIN32__
+ return win32_get_number_of_processors();
+#else
+ return sysconf(_SC_NPROCESSORS_ONLN);
+#endif
+}
+
+/* Equivalent to write_stream_list_serial(), except this takes a @num_threads
+ * parameter and will perform compression using that many threads. Falls
+ * back to write_stream_list_serial() on certain errors, such as a failure to
+ * create the number of threads requested.
+ *
+ * High level description of the algorithm for writing compressed streams in
+ * parallel: We perform compression on chunks of size WIM_CHUNK_SIZE bytes
+ * rather than on full files. The currently executing thread becomes the main
+ * thread and is entirely in charge of reading the data to compress (which may
+ * be in any location understood by the resource code--- such as in an external
+ * file being captured, or in another WIM file from which an image is being
+ * exported) and actually writing the compressed data to the output file.
+ * Additional threads are "compressor threads" and all execute the
+ * compressor_thread_proc, where they repeatedly retrieve buffers of data from
+ * the main thread, compress them, and hand them back to the main thread.
+ *
+ * Certain streams, such as streams that do not need to be compressed (e.g.
+ * input compression type same as output compression type) or streams of very
+ * small size are placed in a list (main_writer_thread_ctx.serial_list) and
+ * handled entirely by the main thread at an appropriate time.
+ *
+ * At any given point in time, multiple streams may be having chunks compressed
+ * concurrently. The stream that the main thread is currently *reading* may be
+ * later in the list that the stream that the main thread is currently
+ * *writing*.
+ */
+static int
+write_stream_list_parallel(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ filedes_t out_fd,
+ int out_ctype,
+ int write_resource_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress,
+ unsigned num_threads)
+{
+ int ret;
+ struct shared_queue res_to_compress_queue;
+ struct shared_queue compressed_res_queue;
+ pthread_t *compressor_threads = NULL;
+
+ if (num_threads == 0) {
+ long nthreads = get_default_num_threads();
+ if (nthreads < 1 || nthreads > UINT_MAX) {
+ WARNING("Could not determine number of processors! Assuming 1");
+ goto out_serial;
+ } else if (nthreads == 1) {
+ goto out_serial_quiet;