+ DEBUG("Compressor thread terminating");
+ return NULL;
+}
+#endif
+
+static int do_write_stream_list(struct list_head *my_resources,
+ FILE *out_fp,
+ int out_ctype,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress,
+ int write_resource_flags)
+{
+ int ret;
+ struct lookup_table_entry *lte, *tmp;
+
+ list_for_each_entry_safe(lte, tmp, my_resources, staging_list) {
+ ret = write_wim_resource(lte,
+ out_fp,
+ out_ctype,
+ <e->output_resource_entry,
+ write_resource_flags);
+ if (ret != 0)
+ return ret;
+ list_del(<e->staging_list);
+ progress->write_streams.completed_bytes +=
+ wim_resource_size(lte);
+ progress->write_streams.completed_streams++;
+ if (progress_func) {
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS,
+ progress);
+ }
+ }
+ return 0;
+}
+
+static int write_stream_list_serial(struct list_head *stream_list,
+ FILE *out_fp,
+ int out_ctype,
+ int write_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int write_resource_flags;
+
+ if (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)
+ write_resource_flags = WIMLIB_RESOURCE_FLAG_RECOMPRESS;
+ else
+ write_resource_flags = 0;
+ progress->write_streams.num_threads = 1;
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+ return do_write_stream_list(stream_list, out_fp,
+ out_ctype, progress_func,
+ progress, write_resource_flags);
+}
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+static int write_wim_chunks(struct message *msg, FILE *out_fp,
+ struct chunk_table *chunk_tab)
+{
+ for (unsigned i = 0; i < msg->num_chunks; i++) {
+ unsigned chunk_csize = msg->compressed_chunk_sizes[i];
+
+ DEBUG2("Write wim chunk %u of %u (csize = %u)",
+ i, msg->num_chunks, chunk_csize);
+
+ if (fwrite(msg->out_compressed_chunks[i], 1, chunk_csize, out_fp)
+ != chunk_csize)
+ {
+ ERROR_WITH_ERRNO("Failed to write WIM chunk");
+ return WIMLIB_ERR_WRITE;
+ }
+
+ *chunk_tab->cur_offset_p++ = chunk_tab->cur_offset;
+ chunk_tab->cur_offset += chunk_csize;
+ }
+ return 0;
+}
+
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel. The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data. The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
+static int main_writer_thread_proc(struct list_head *stream_list,
+ FILE *out_fp,
+ int out_ctype,
+ struct shared_queue *res_to_compress_queue,
+ struct shared_queue *compressed_res_queue,
+ size_t queue_size,
+ int write_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int ret;
+
+ struct message msgs[queue_size];
+ ZERO_ARRAY(msgs);
+
+ // Initially, all the messages are available to use.
+ LIST_HEAD(available_msgs);
+ for (size_t i = 0; i < ARRAY_LEN(msgs); i++)
+ list_add(&msgs[i].list, &available_msgs);
+
+ // outstanding_resources is the list of resources that currently have
+ // had chunks sent off for compression.
+ //
+ // The first stream in outstanding_resources is the stream that is
+ // currently being written (cur_lte).
+ //
+ // The last stream in outstanding_resources is the stream that is
+ // currently being read and chunks fed to the compressor threads
+ // (next_lte).
+ //
+ // Depending on the number of threads and the sizes of the resource,
+ // the outstanding streams list may contain streams between cur_lte and
+ // next_lte that have all their chunks compressed or being compressed,
+ // but haven't been written yet.
+ //
+ LIST_HEAD(outstanding_resources);
+ struct list_head *next_resource = stream_list->next;
+ struct lookup_table_entry *next_lte = container_of(next_resource,
+ struct lookup_table_entry,
+ staging_list);
+ next_resource = next_resource->next;
+ u64 next_chunk = 0;
+ u64 next_num_chunks = wim_resource_chunks(next_lte);
+ INIT_LIST_HEAD(&next_lte->msg_list);
+ list_add_tail(&next_lte->staging_list, &outstanding_resources);
+
+ // As in write_wim_resource(), each resource we read is checksummed.
+ SHA_CTX next_sha_ctx;
+ sha1_init(&next_sha_ctx);
+ u8 next_hash[SHA1_HASH_SIZE];
+
+ // Resources that don't need any chunks compressed are added to this
+ // list and written directly by the main thread.
+ LIST_HEAD(my_resources);
+
+ struct lookup_table_entry *cur_lte = next_lte;
+ struct chunk_table *cur_chunk_tab = NULL;
+ struct message *msg;
+
+#ifdef WITH_NTFS_3G
+ ntfs_inode *ni = NULL;
+#endif
+
+#ifdef WITH_NTFS_3G
+ ret = prepare_resource_for_read(next_lte, &ni);
+#else
+ ret = prepare_resource_for_read(next_lte);
+#endif