+ ret = 0;
+out_fclose:
+#ifdef WITH_NTFS_3G
+ end_wim_resource_read(lte, ni);
+#else
+ end_wim_resource_read(lte);
+#endif
+out:
+ FREE(chunk_tab);
+ return ret;
+}
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+struct shared_queue {
+ unsigned size;
+ unsigned front;
+ unsigned back;
+ unsigned filled_slots;
+ void **array;
+ pthread_mutex_t lock;
+ pthread_cond_t msg_avail_cond;
+ pthread_cond_t space_avail_cond;
+};
+
+static int shared_queue_init(struct shared_queue *q, unsigned size)
+{
+ q->array = CALLOC(sizeof(q->array[0]), size);
+ if (!q->array)
+ return WIMLIB_ERR_NOMEM;
+ q->filled_slots = 0;
+ q->front = 0;
+ q->back = size - 1;
+ q->size = size;
+ pthread_mutex_init(&q->lock, NULL);
+ pthread_cond_init(&q->msg_avail_cond, NULL);
+ pthread_cond_init(&q->space_avail_cond, NULL);
+ return 0;
+}
+
+static void shared_queue_destroy(struct shared_queue *q)
+{
+ FREE(q->array);
+ pthread_mutex_destroy(&q->lock);
+ pthread_cond_destroy(&q->msg_avail_cond);
+ pthread_cond_destroy(&q->space_avail_cond);
+}
+
+static void shared_queue_put(struct shared_queue *q, void *obj)
+{
+ pthread_mutex_lock(&q->lock);
+ while (q->filled_slots == q->size)
+ pthread_cond_wait(&q->space_avail_cond, &q->lock);
+
+ q->back = (q->back + 1) % q->size;
+ q->array[q->back] = obj;
+ q->filled_slots++;
+
+ pthread_cond_broadcast(&q->msg_avail_cond);
+ pthread_mutex_unlock(&q->lock);
+}
+
+static void *shared_queue_get(struct shared_queue *q)
+{
+ void *obj;
+
+ pthread_mutex_lock(&q->lock);
+ while (q->filled_slots == 0)
+ pthread_cond_wait(&q->msg_avail_cond, &q->lock);
+
+ obj = q->array[q->front];
+ q->array[q->front] = NULL;
+ q->front = (q->front + 1) % q->size;
+ q->filled_slots--;
+
+ pthread_cond_broadcast(&q->space_avail_cond);
+ pthread_mutex_unlock(&q->lock);
+ return obj;
+}
+
+struct compressor_thread_params {
+ struct shared_queue *res_to_compress_queue;
+ struct shared_queue *compressed_res_queue;
+ compress_func_t compress;
+};
+
+#define MAX_CHUNKS_PER_MSG 2
+
+struct message {
+ struct lookup_table_entry *lte;
+ u8 *uncompressed_chunks[MAX_CHUNKS_PER_MSG];
+ u8 *out_compressed_chunks[MAX_CHUNKS_PER_MSG];
+ u8 *compressed_chunks[MAX_CHUNKS_PER_MSG];
+ unsigned uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ unsigned compressed_chunk_sizes[MAX_CHUNKS_PER_MSG];
+ unsigned num_chunks;
+ struct list_head list;
+ bool complete;
+ u64 begin_chunk;
+};
+
+static void compress_chunks(struct message *msg, compress_func_t compress)
+{
+ for (unsigned i = 0; i < msg->num_chunks; i++) {
+ DEBUG2("compress chunk %u of %u", i, msg->num_chunks);
+ int ret = compress(msg->uncompressed_chunks[i],
+ msg->uncompressed_chunk_sizes[i],
+ msg->compressed_chunks[i],
+ &msg->compressed_chunk_sizes[i]);
+ if (ret == 0) {
+ msg->out_compressed_chunks[i] = msg->compressed_chunks[i];
+ } else {
+ msg->out_compressed_chunks[i] = msg->uncompressed_chunks[i];
+ msg->compressed_chunk_sizes[i] = msg->uncompressed_chunk_sizes[i];
+ }
+ }
+}
+
+static void *compressor_thread_proc(void *arg)
+{
+ struct compressor_thread_params *params = arg;
+ struct shared_queue *res_to_compress_queue = params->res_to_compress_queue;
+ struct shared_queue *compressed_res_queue = params->compressed_res_queue;
+ compress_func_t compress = params->compress;
+ struct message *msg;
+
+ DEBUG("Compressor thread ready");
+ while ((msg = shared_queue_get(res_to_compress_queue)) != NULL) {
+ compress_chunks(msg, compress);
+ shared_queue_put(compressed_res_queue, msg);
+ }
+ DEBUG("Compressor thread terminating");
+ return NULL;
+}
+#endif
+
+static int do_write_stream_list(struct list_head *my_resources,
+ FILE *out_fp,
+ int out_ctype,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress,
+ int write_resource_flags)
+{
+ int ret;
+ struct lookup_table_entry *lte, *tmp;
+
+ list_for_each_entry_safe(lte, tmp, my_resources, staging_list) {
+ ret = write_wim_resource(lte,
+ out_fp,
+ out_ctype,
+ <e->output_resource_entry,
+ write_resource_flags);
+ if (ret != 0)
+ return ret;
+ list_del(<e->staging_list);
+ progress->write_streams.completed_bytes +=
+ wim_resource_size(lte);
+ progress->write_streams.completed_streams++;
+ if (progress_func) {
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS,
+ progress);
+ }
+ }
+ return 0;
+}
+
+static int write_stream_list_serial(struct list_head *stream_list,
+ FILE *out_fp,
+ int out_ctype,
+ int write_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int write_resource_flags;
+
+ if (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)
+ write_resource_flags = WIMLIB_RESOURCE_FLAG_RECOMPRESS;
+ else
+ write_resource_flags = 0;
+ progress->write_streams.num_threads = 1;
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+ return do_write_stream_list(stream_list, out_fp,
+ out_ctype, progress_func,
+ progress, write_resource_flags);
+}
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+static int write_wim_chunks(struct message *msg, FILE *out_fp,
+ struct chunk_table *chunk_tab)
+{
+ for (unsigned i = 0; i < msg->num_chunks; i++) {
+ unsigned chunk_csize = msg->compressed_chunk_sizes[i];
+
+ DEBUG2("Write wim chunk %u of %u (csize = %u)",
+ i, msg->num_chunks, chunk_csize);
+
+ if (fwrite(msg->out_compressed_chunks[i], 1, chunk_csize, out_fp)
+ != chunk_csize)
+ {
+ ERROR_WITH_ERRNO("Failed to write WIM chunk");
+ return WIMLIB_ERR_WRITE;
+ }
+
+ *chunk_tab->cur_offset_p++ = chunk_tab->cur_offset;
+ chunk_tab->cur_offset += chunk_csize;
+ }
+ return 0;
+}
+
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel. The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data. The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
+static int main_writer_thread_proc(struct list_head *stream_list,
+ FILE *out_fp,
+ int out_ctype,
+ struct shared_queue *res_to_compress_queue,
+ struct shared_queue *compressed_res_queue,
+ size_t queue_size,
+ int write_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int ret;
+
+ struct message msgs[queue_size];
+ ZERO_ARRAY(msgs);
+
+ // Initially, all the messages are available to use.
+ LIST_HEAD(available_msgs);
+ for (size_t i = 0; i < ARRAY_LEN(msgs); i++)
+ list_add(&msgs[i].list, &available_msgs);
+
+ // outstanding_resources is the list of resources that currently have
+ // had chunks sent off for compression.
+ //
+ // The first stream in outstanding_resources is the stream that is
+ // currently being written (cur_lte).
+ //
+ // The last stream in outstanding_resources is the stream that is
+ // currently being read and chunks fed to the compressor threads
+ // (next_lte).
+ //
+ // Depending on the number of threads and the sizes of the resource,
+ // the outstanding streams list may contain streams between cur_lte and
+ // next_lte that have all their chunks compressed or being compressed,
+ // but haven't been written yet.
+ //
+ LIST_HEAD(outstanding_resources);
+ struct list_head *next_resource = stream_list->next;
+ struct lookup_table_entry *next_lte = container_of(next_resource,
+ struct lookup_table_entry,
+ staging_list);
+ next_resource = next_resource->next;
+ u64 next_chunk = 0;
+ u64 next_num_chunks = wim_resource_chunks(next_lte);
+ INIT_LIST_HEAD(&next_lte->msg_list);
+ list_add_tail(&next_lte->staging_list, &outstanding_resources);
+
+ // As in write_wim_resource(), each resource we read is checksummed.
+ SHA_CTX next_sha_ctx;
+ sha1_init(&next_sha_ctx);
+ u8 next_hash[SHA1_HASH_SIZE];
+
+ // Resources that don't need any chunks compressed are added to this
+ // list and written directly by the main thread.
+ LIST_HEAD(my_resources);
+
+ struct lookup_table_entry *cur_lte = next_lte;
+ struct chunk_table *cur_chunk_tab = NULL;
+ struct message *msg;
+
+#ifdef WITH_NTFS_3G
+ ntfs_inode *ni = NULL;
+#endif
+
+#ifdef WITH_NTFS_3G
+ ret = prepare_resource_for_read(next_lte, &ni);
+#else
+ ret = prepare_resource_for_read(next_lte);
+#endif