- // Since we just finished writing a stream,
- // write any streams that have been added to the
- // my_resources list for direct writing by the
- // main thread (e.g. resources that don't need
- // to be compressed because the desired
- // compression type is the same as the previous
- // compression type).
- struct lookup_table_entry *tmp;
- list_for_each_entry_safe(lte,
- tmp,
- &my_resources,
- staging_list)
- {
- if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
- show_stream_write_progress(&cur_size,
- &next_size,
- total_size,
- one_percent,
- &cur_percent,
- lte);
- }
-
- ret = write_wim_resource(lte,
- out_fp,
- out_ctype,
- <e->output_resource_entry,
- 0);
- list_del(<e->staging_list);
- if (ret != 0)
- goto out;
+ if (!next_msg) {
+ if (list_empty(&ctx->available_msgs)) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+
+ wimlib_assert(!list_empty(&ctx->available_msgs));
+
+ next_msg = container_of(ctx->available_msgs.next,
+ struct message,
+ list);
+ list_del(&next_msg->list);
+ next_msg->complete = false;
+ next_msg->begin_chunk = ctx->next_chunk;
+ next_msg->num_chunks = min(MAX_CHUNKS_PER_MSG,
+ ctx->next_num_chunks - ctx->next_chunk);
+ ctx->next_chunk_in_msg = 0;
+ }
+
+ wimlib_assert(next_msg != NULL);
+ wimlib_assert(ctx->next_chunk_in_msg < next_msg->num_chunks);
+
+ next_msg->uncompressed_chunk_sizes[ctx->next_chunk_in_msg] = chunk_size;
+ memcpy(next_msg->uncompressed_chunks[ctx->next_chunk_in_msg],
+ chunk, chunk_size);
+
+ if (++ctx->next_chunk_in_msg == next_msg->num_chunks) {
+ shared_queue_put(ctx->res_to_compress_queue,
+ next_msg);
+ ctx->next_msg = NULL;
+ }
+ return 0;
+}
+
+static int
+submit_stream_for_compression(struct wim_lookup_table_entry *lte,
+ struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+
+ sha1_init(&ctx->sha_ctx);
+ ctx->next_num_chunks = wim_resource_chunks(lte);
+ ret = read_resource_prefix(lte, wim_resource_size(lte),
+ main_writer_thread_cb, ctx, 0);
+ if (ret)
+ return ret;
+ ret = finalize_and_check_sha1(&ctx->sha_ctx, lte);
+ if (ret)
+ return ret;
+}
+
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel. The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data. The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
+static int
+main_writer_thread_proc(struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+ struct list_head *stream_list;
+ struct wim_lookup_table_entry *lte;
+
+ ret = main_writer_thread_init_ctx(ctx);
+ if (ret)
+ goto out_destroy_ctx;
+
+ stream_list = ctx->stream_list;
+ while (!list_empty(stream_list)) {
+ lte = container_of(stream_list->next,
+ struct wim_lookup_table_entry,
+ write_streams_list);
+ list_del(<e->write_streams_list);
+ if (lte->unhashed && !lte->unique_size) {
+ struct wim_lookup_table_entry *tmp;
+ u32 orig_refcnt = lte->out_refcnt;
+
+ ret = hash_unhashed_stream(lte, ctx->lookup_table, &tmp);
+ if (ret)
+ goto out_destroy_ctx;
+ if (tmp != lte) {
+ lte = tmp;
+ if (orig_refcnt != tmp->out_refcnt) {
+ DEBUG("Discarding duplicate stream of length %"PRIu64,
+ wim_resource_size(lte));
+ goto skip_to_progress;