+static void
+do_write_streams_progress(union wimlib_progress_info *progress,
+ wimlib_progress_func_t progress_func,
+ uint64_t size_added,
+ bool stream_discarded)
+{
+ if (stream_discarded) {
+ progress->write_streams.total_bytes -= size_added;
+ if (progress->write_streams._private != ~(uint64_t)0 &&
+ progress->write_streams._private > progress->write_streams.total_bytes)
+ {
+ progress->write_streams._private = progress->write_streams.total_bytes;
+ }
+ } else {
+ progress->write_streams.completed_bytes += size_added;
+ }
+ progress->write_streams.completed_streams++;
+ if (progress_func &&
+ progress->write_streams.completed_bytes >= progress->write_streams._private)
+ {
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS,
+ progress);
+ if (progress->write_streams._private == progress->write_streams.total_bytes) {
+ progress->write_streams._private = ~(uint64_t)0;
+ } else {
+ progress->write_streams._private =
+ min(progress->write_streams.total_bytes,
+ progress->write_streams.completed_bytes +
+ progress->write_streams.total_bytes / 100);
+ }
+ }
+}
+
+struct serial_write_stream_ctx {
+ int out_fd;
+ int out_ctype;
+ int write_resource_flags;
+};
+
+static int
+serial_write_stream(struct wim_lookup_table_entry *lte, void *_ctx)
+{
+ struct serial_write_stream_ctx *ctx = _ctx;
+ return write_wim_resource(lte, ctx->out_fd,
+ ctx->out_ctype, <e->output_resource_entry,
+ ctx->write_resource_flags);
+}
+
+/* Write a list of streams, taking into account that some streams may be
+ * duplicates that are checksummed and discarded on the fly, and also delegating
+ * the actual writing of a stream to a function @write_stream_cb, which is
+ * passed the context @write_stream_ctx. */
+static int
+do_write_stream_list(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ int (*write_stream_cb)(struct wim_lookup_table_entry *, void *),
+ void *write_stream_ctx,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int ret = 0;
+ struct wim_lookup_table_entry *lte;
+ bool stream_discarded;
+
+ /* For each stream in @stream_list ... */
+ while (!list_empty(stream_list)) {
+ stream_discarded = false;
+ lte = container_of(stream_list->next,
+ struct wim_lookup_table_entry,
+ write_streams_list);
+ list_del(<e->write_streams_list);
+ if (lte->unhashed && !lte->unique_size) {
+ /* Unhashed stream that shares a size with some other
+ * stream in the WIM we are writing. The stream must be
+ * checksummed to know if we need to write it or not. */
+ struct wim_lookup_table_entry *tmp;
+ u32 orig_refcnt = lte->out_refcnt;
+
+ ret = hash_unhashed_stream(lte, lookup_table, &tmp);
+ if (ret)
+ break;
+ if (tmp != lte) {
+ lte = tmp;
+ /* We found a duplicate stream. */
+ if (orig_refcnt != tmp->out_refcnt) {
+ /* We have already written, or are going
+ * to write, the duplicate stream. So
+ * just skip to the next stream. */
+ DEBUG("Discarding duplicate stream of length %"PRIu64,
+ wim_resource_size(lte));
+ lte->no_progress = 0;
+ stream_discarded = true;
+ goto skip_to_progress;
+ }
+ }
+ }
+
+ /* Here, @lte is either a hashed stream or an unhashed stream
+ * with a unique size. In either case we know that the stream
+ * has to be written. In either case the SHA1 message digest
+ * will be calculated over the stream while writing it; however,
+ * in the former case this is done merely to check the data,
+ * while in the latter case this is done because we do not have
+ * the SHA1 message digest yet. */
+ wimlib_assert(lte->out_refcnt != 0);
+ lte->deferred = 0;
+ lte->no_progress = 0;
+ ret = (*write_stream_cb)(lte, write_stream_ctx);
+ if (ret)
+ break;
+ /* In parallel mode, some streams are deferred for later,
+ * serialized processing; ignore them here. */
+ if (lte->deferred)
+ continue;
+ if (lte->unhashed) {
+ list_del(<e->unhashed_list);
+ lookup_table_insert(lookup_table, lte);
+ lte->unhashed = 0;
+ }
+ skip_to_progress:
+ if (!lte->no_progress) {
+ do_write_streams_progress(progress,
+ progress_func,
+ wim_resource_size(lte),
+ stream_discarded);
+ }
+ }
+ return ret;
+}
+
+static int
+do_write_stream_list_serial(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ int out_fd,
+ int out_ctype,
+ int write_resource_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ struct serial_write_stream_ctx ctx = {
+ .out_fd = out_fd,
+ .out_ctype = out_ctype,
+ .write_resource_flags = write_resource_flags,
+ };
+ return do_write_stream_list(stream_list,
+ lookup_table,
+ serial_write_stream,
+ &ctx,
+ progress_func,
+ progress);
+}
+
+static inline int
+write_flags_to_resource_flags(int write_flags)
+{
+ int resource_flags = 0;
+
+ if (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)
+ resource_flags |= WIMLIB_RESOURCE_FLAG_RECOMPRESS;
+ return resource_flags;
+}
+
+static int
+write_stream_list_serial(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ int out_fd,
+ int out_ctype,
+ int write_resource_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ DEBUG("Writing stream list (serial version)");
+ progress->write_streams.num_threads = 1;
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+ return do_write_stream_list_serial(stream_list,
+ lookup_table,
+ out_fd,
+ out_ctype,
+ write_resource_flags,
+ progress_func,
+ progress);
+}
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+static int
+write_wim_chunks(struct message *msg, int out_fd,
+ struct chunk_table *chunk_tab)
+{
+ for (unsigned i = 0; i < msg->num_chunks; i++) {
+ *chunk_tab->cur_offset_p++ = chunk_tab->cur_offset;
+ chunk_tab->cur_offset += msg->out_chunks[i].iov_len;
+ }
+ if (full_writev(out_fd, msg->out_chunks,
+ msg->num_chunks) != msg->total_out_bytes)
+ {
+ ERROR_WITH_ERRNO("Failed to write WIM chunks");
+ return WIMLIB_ERR_WRITE;
+ }
+ return 0;
+}
+
+struct main_writer_thread_ctx {
+ struct list_head *stream_list;
+ struct wim_lookup_table *lookup_table;
+ int out_fd;
+ int out_ctype;
+ int write_resource_flags;
+ struct shared_queue *res_to_compress_queue;
+ struct shared_queue *compressed_res_queue;
+ size_t num_messages;
+ wimlib_progress_func_t progress_func;
+ union wimlib_progress_info *progress;
+
+ struct list_head available_msgs;
+ struct list_head outstanding_streams;
+ struct list_head serial_streams;
+ size_t num_outstanding_messages;
+
+ SHA_CTX next_sha_ctx;
+ u64 next_chunk;
+ u64 next_num_chunks;
+ struct wim_lookup_table_entry *next_lte;
+
+ struct message *msgs;
+ struct message *next_msg;
+ struct chunk_table *cur_chunk_tab;
+};
+
+static int
+init_message(struct message *msg)
+{
+ for (size_t i = 0; i < MAX_CHUNKS_PER_MSG; i++) {
+ msg->compressed_chunks[i] = MALLOC(WIM_CHUNK_SIZE);
+ msg->uncompressed_chunks[i] = MALLOC(WIM_CHUNK_SIZE);
+ if (msg->compressed_chunks[i] == NULL ||
+ msg->uncompressed_chunks[i] == NULL)
+ return WIMLIB_ERR_NOMEM;
+ }
+ return 0;
+}
+
+static void
+destroy_message(struct message *msg)
+{
+ for (size_t i = 0; i < MAX_CHUNKS_PER_MSG; i++) {
+ FREE(msg->compressed_chunks[i]);
+ FREE(msg->uncompressed_chunks[i]);
+ }
+}
+
+static void
+free_messages(struct message *msgs, size_t num_messages)
+{
+ if (msgs) {
+ for (size_t i = 0; i < num_messages; i++)
+ destroy_message(&msgs[i]);
+ FREE(msgs);
+ }
+}
+
+static struct message *
+allocate_messages(size_t num_messages)
+{
+ struct message *msgs;
+
+ msgs = CALLOC(num_messages, sizeof(struct message));
+ if (!msgs)
+ return NULL;
+ for (size_t i = 0; i < num_messages; i++) {
+ if (init_message(&msgs[i])) {
+ free_messages(msgs, num_messages);
+ return NULL;
+ }
+ }
+ return msgs;
+}
+
+static void
+main_writer_thread_destroy_ctx(struct main_writer_thread_ctx *ctx)
+{
+ while (ctx->num_outstanding_messages--)
+ shared_queue_get(ctx->compressed_res_queue);
+ free_messages(ctx->msgs, ctx->num_messages);
+ FREE(ctx->cur_chunk_tab);
+}
+
+static int
+main_writer_thread_init_ctx(struct main_writer_thread_ctx *ctx)
+{
+ /* Pre-allocate all the buffers that will be needed to do the chunk
+ * compression. */
+ ctx->msgs = allocate_messages(ctx->num_messages);
+ if (!ctx->msgs)
+ return WIMLIB_ERR_NOMEM;
+
+ /* Initially, all the messages are available to use. */
+ INIT_LIST_HEAD(&ctx->available_msgs);
+ for (size_t i = 0; i < ctx->num_messages; i++)
+ list_add_tail(&ctx->msgs[i].list, &ctx->available_msgs);
+
+ /* outstanding_streams is the list of streams that currently have had
+ * chunks sent off for compression.
+ *
+ * The first stream in outstanding_streams is the stream that is
+ * currently being written.
+ *
+ * The last stream in outstanding_streams is the stream that is
+ * currently being read and having chunks fed to the compressor threads.
+ * */
+ INIT_LIST_HEAD(&ctx->outstanding_streams);
+ ctx->num_outstanding_messages = 0;
+
+ ctx->next_msg = NULL;
+
+ /* Resources that don't need any chunks compressed are added to this
+ * list and written directly by the main thread. */
+ INIT_LIST_HEAD(&ctx->serial_streams);
+
+ ctx->cur_chunk_tab = NULL;
+
+ return 0;
+}
+
+static int
+receive_compressed_chunks(struct main_writer_thread_ctx *ctx)
+{
+ struct message *msg;
+ struct wim_lookup_table_entry *cur_lte;
+ int ret;
+
+ wimlib_assert(!list_empty(&ctx->outstanding_streams));
+ wimlib_assert(ctx->num_outstanding_messages != 0);
+
+ cur_lte = container_of(ctx->outstanding_streams.next,
+ struct wim_lookup_table_entry,
+ being_compressed_list);
+
+ /* Get the next message from the queue and process it.
+ * The message will contain 1 or more data chunks that have been
+ * compressed. */
+ msg = shared_queue_get(ctx->compressed_res_queue);
+ msg->complete = true;
+ --ctx->num_outstanding_messages;
+
+ /* Is this the next chunk in the current resource? If it's not
+ * (i.e., an earlier chunk in a same or different resource
+ * hasn't been compressed yet), do nothing, and keep this
+ * message around until all earlier chunks are received.
+ *
+ * Otherwise, write all the chunks we can. */
+ while (cur_lte != NULL &&
+ !list_empty(&cur_lte->msg_list)
+ && (msg = container_of(cur_lte->msg_list.next,
+ struct message,
+ list))->complete)
+ {
+ list_move(&msg->list, &ctx->available_msgs);
+ if (msg->begin_chunk == 0) {
+ /* This is the first set of chunks. Leave space
+ * for the chunk table in the output file. */
+ off_t cur_offset = filedes_offset(ctx->out_fd);
+ if (cur_offset == -1)
+ return WIMLIB_ERR_WRITE;
+ ret = begin_wim_resource_chunk_tab(cur_lte,
+ ctx->out_fd,
+ cur_offset,
+ &ctx->cur_chunk_tab);
+ if (ret)
+ return ret;
+ }
+
+ /* Write the compressed chunks from the message. */
+ ret = write_wim_chunks(msg, ctx->out_fd, ctx->cur_chunk_tab);
+ if (ret)
+ return ret;
+
+ /* Was this the last chunk of the stream? If so, finish
+ * it. */
+ if (list_empty(&cur_lte->msg_list) &&
+ msg->begin_chunk + msg->num_chunks == ctx->cur_chunk_tab->num_chunks)
+ {
+ u64 res_csize;
+ off_t offset;
+
+ ret = finish_wim_resource_chunk_tab(ctx->cur_chunk_tab,
+ ctx->out_fd,
+ &res_csize);
+ if (ret)
+ return ret;
+
+ list_del(&cur_lte->being_compressed_list);
+
+ /* Grab the offset of this stream in the output file
+ * from the chunk table before we free it. */
+ offset = ctx->cur_chunk_tab->file_offset;
+
+ FREE(ctx->cur_chunk_tab);
+ ctx->cur_chunk_tab = NULL;
+
+ if (res_csize >= wim_resource_size(cur_lte)) {
+ /* Oops! We compressed the resource to
+ * larger than the original size. Write
+ * the resource uncompressed instead. */
+ DEBUG("Compressed %"PRIu64" => %"PRIu64" bytes; "
+ "writing uncompressed instead",
+ wim_resource_size(cur_lte), res_csize);
+ ret = seek_and_truncate(ctx->out_fd, offset);
+ if (ret)
+ return ret;
+ ret = write_wim_resource(cur_lte,
+ ctx->out_fd,
+ WIMLIB_COMPRESSION_TYPE_NONE,
+ &cur_lte->output_resource_entry,
+ ctx->write_resource_flags);
+ if (ret)
+ return ret;
+ } else {
+ cur_lte->output_resource_entry.size =
+ res_csize;
+
+ cur_lte->output_resource_entry.original_size =
+ cur_lte->resource_entry.original_size;
+
+ cur_lte->output_resource_entry.offset =
+ offset;
+
+ cur_lte->output_resource_entry.flags =
+ cur_lte->resource_entry.flags |
+ WIM_RESHDR_FLAG_COMPRESSED;
+ }
+
+ do_write_streams_progress(ctx->progress,
+ ctx->progress_func,
+ wim_resource_size(cur_lte),
+ false);
+
+ /* Since we just finished writing a stream, write any
+ * streams that have been added to the serial_streams
+ * list for direct writing by the main thread (e.g.
+ * resources that don't need to be compressed because
+ * the desired compression type is the same as the
+ * previous compression type). */
+ if (!list_empty(&ctx->serial_streams)) {
+ ret = do_write_stream_list_serial(&ctx->serial_streams,
+ ctx->lookup_table,
+ ctx->out_fd,
+ ctx->out_ctype,
+ ctx->write_resource_flags,
+ ctx->progress_func,
+ ctx->progress);
+ if (ret)
+ return ret;
+ }
+
+ /* Advance to the next stream to write. */
+ if (list_empty(&ctx->outstanding_streams)) {
+ cur_lte = NULL;
+ } else {
+ cur_lte = container_of(ctx->outstanding_streams.next,
+ struct wim_lookup_table_entry,
+ being_compressed_list);
+ }
+ }
+ }
+ return 0;
+}
+
+/* Called when the main thread has read a new chunk of data. */
+static int
+main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+ struct message *next_msg;
+ u64 next_chunk_in_msg;
+
+ /* Update SHA1 message digest for the stream currently being read by the
+ * main thread. */
+ sha1_update(&ctx->next_sha_ctx, chunk, chunk_size);
+
+ /* We send chunks of data to the compressor chunks in batches which we
+ * refer to as "messages". @next_msg is the message that is currently
+ * being prepared to send off. If it is NULL, that indicates that we
+ * need to start a new message. */
+ next_msg = ctx->next_msg;
+ if (!next_msg) {
+ /* We need to start a new message. First check to see if there
+ * is a message available in the list of available messages. If
+ * so, we can just take one. If not, all the messages (there is
+ * a fixed number of them, proportional to the number of
+ * threads) have been sent off to the compressor threads, so we
+ * receive messages from the compressor threads containing
+ * compressed chunks of data.
+ *
+ * We may need to receive multiple messages before one is
+ * actually available to use because messages received that are
+ * *not* for the very next set of chunks to compress must be
+ * buffered until it's time to write those chunks. */
+ while (list_empty(&ctx->available_msgs)) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+
+ next_msg = container_of(ctx->available_msgs.next,
+ struct message, list);
+ list_del(&next_msg->list);
+ next_msg->complete = false;
+ next_msg->begin_chunk = ctx->next_chunk;
+ next_msg->num_chunks = min(MAX_CHUNKS_PER_MSG,
+ ctx->next_num_chunks - ctx->next_chunk);
+ ctx->next_msg = next_msg;
+ }
+
+ /* Fill in the next chunk to compress */
+ next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk;
+
+ next_msg->uncompressed_chunk_sizes[next_chunk_in_msg] = chunk_size;
+ memcpy(next_msg->uncompressed_chunks[next_chunk_in_msg],
+ chunk, chunk_size);
+ ctx->next_chunk++;
+ if (++next_chunk_in_msg == next_msg->num_chunks) {
+ /* Send off an array of chunks to compress */
+ list_add_tail(&next_msg->list, &ctx->next_lte->msg_list);
+ shared_queue_put(ctx->res_to_compress_queue, next_msg);
+ ++ctx->num_outstanding_messages;
+ ctx->next_msg = NULL;
+ }
+ return 0;
+}
+
+static int
+main_writer_thread_finish(void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+ while (ctx->num_outstanding_messages != 0) {
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+ wimlib_assert(list_empty(&ctx->outstanding_streams));
+ return do_write_stream_list_serial(&ctx->serial_streams,
+ ctx->lookup_table,
+ ctx->out_fd,
+ ctx->out_ctype,
+ ctx->write_resource_flags,
+ ctx->progress_func,
+ ctx->progress);
+}
+
+static int
+submit_stream_for_compression(struct wim_lookup_table_entry *lte,
+ struct main_writer_thread_ctx *ctx)
+{
+ int ret;
+
+ /* Read the entire stream @lte, feeding its data chunks to the
+ * compressor threads. Also SHA1-sum the stream; this is required in
+ * the case that @lte is unhashed, and a nice additional verification
+ * when @lte is already hashed. */
+ sha1_init(&ctx->next_sha_ctx);
+ ctx->next_chunk = 0;
+ ctx->next_num_chunks = wim_resource_chunks(lte);
+ ctx->next_lte = lte;
+ INIT_LIST_HEAD(<e->msg_list);
+ list_add_tail(<e->being_compressed_list, &ctx->outstanding_streams);
+ ret = read_resource_prefix(lte, wim_resource_size(lte),
+ main_writer_thread_cb, ctx, 0);
+ if (ret == 0) {
+ wimlib_assert(ctx->next_chunk == ctx->next_num_chunks);
+ ret = finalize_and_check_sha1(&ctx->next_sha_ctx, lte);
+ }
+ return ret;
+}
+
+static int
+main_thread_process_next_stream(struct wim_lookup_table_entry *lte, void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+
+ if (wim_resource_size(lte) < 1000 ||
+ ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE ||
+ (lte->resource_location == RESOURCE_IN_WIM &&
+ !(ctx->write_resource_flags & WIMLIB_RESOURCE_FLAG_RECOMPRESS) &&
+ wimlib_get_compression_type(lte->wim) == ctx->out_ctype))
+ {
+ /* Stream is too small or isn't being compressed. Process it by
+ * the main thread when we have a chance. We can't necessarily
+ * process it right here, as the main thread could be in the
+ * middle of writing a different stream. */
+ list_add_tail(<e->write_streams_list, &ctx->serial_streams);
+ lte->deferred = 1;
+ ret = 0;
+ } else {
+ ret = submit_stream_for_compression(lte, ctx);
+ }
+ lte->no_progress = 1;
+ return ret;
+}
+
+static long
+get_default_num_threads()
+{
+#ifdef __WIN32__
+ return win32_get_number_of_processors();
+#else
+ return sysconf(_SC_NPROCESSORS_ONLN);
+#endif
+}
+
+/* Equivalent to write_stream_list_serial(), except this takes a @num_threads
+ * parameter and will perform compression using that many threads. Falls
+ * back to write_stream_list_serial() on certain errors, such as a failure to
+ * create the number of threads requested.
+ *
+ * High level description of the algorithm for writing compressed streams in
+ * parallel: We perform compression on chunks of size WIM_CHUNK_SIZE bytes
+ * rather than on full files. The currently executing thread becomes the main
+ * thread and is entirely in charge of reading the data to compress (which may
+ * be in any location understood by the resource code--- such as in an external
+ * file being captured, or in another WIM file from which an image is being
+ * exported) and actually writing the compressed data to the output file.
+ * Additional threads are "compressor threads" and all execute the
+ * compressor_thread_proc, where they repeatedly retrieve buffers of data from
+ * the main thread, compress them, and hand them back to the main thread.
+ *
+ * Certain streams, such as streams that do not need to be compressed (e.g.
+ * input compression type same as output compression type) or streams of very
+ * small size are placed in a list (main_writer_thread_ctx.serial_list) and
+ * handled entirely by the main thread at an appropriate time.
+ *
+ * At any given point in time, multiple streams may be having chunks compressed
+ * concurrently. The stream that the main thread is currently *reading* may be
+ * later in the list that the stream that the main thread is currently
+ * *writing*.
+ */
+static int
+write_stream_list_parallel(struct list_head *stream_list,
+ struct wim_lookup_table *lookup_table,
+ int out_fd,
+ int out_ctype,
+ int write_resource_flags,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress,
+ unsigned num_threads)