+ // outstanding_resources is the list of resources that currently have
+ // had chunks sent off for compression.
+ //
+ // The first stream in outstanding_resources is the stream that is
+ // currently being written (cur_lte).
+ //
+ // The last stream in outstanding_resources is the stream that is
+ // currently being read and chunks fed to the compressor threads
+ // (next_lte).
+ //
+ // Depending on the number of threads and the sizes of the resource,
+ // the outstanding streams list may contain streams between cur_lte and
+ // next_lte that have all their chunks compressed or being compressed,
+ // but haven't been written yet.
+ //
+ LIST_HEAD(outstanding_resources);
+ struct list_head *next_resource = stream_list->next;
+ u64 next_chunk = 0;
+ u64 next_num_chunks = 0;
+
+ // As in write_wim_resource(), each resource we read is checksummed.
+ SHA_CTX next_sha_ctx;
+ u8 next_hash[SHA1_HASH_SIZE];
+
+ // Resources that don't need any chunks compressed are added to this
+ // list and written directly by the main thread.
+ LIST_HEAD(my_resources);
+
+ struct lookup_table_entry *cur_lte = NULL;
+ struct message *msg;
+
+#ifdef WITH_NTFS_3G
+ ntfs_inode *ni = NULL;
+#endif
+
+ DEBUG("Initializing buffers for uncompressed "
+ "and compressed data (%zu bytes needed)",
+ num_messages * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2);
+
+ // Pre-allocate all the buffers that will be needed to do the chunk
+ // compression.
+ for (size_t i = 0; i < num_messages; i++) {
+ for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
+ msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE);
+
+ // The extra 8 bytes is because longest_match() in lz.c
+ // may read a little bit off the end of the uncompressed
+ // data. It doesn't need to be initialized--- we really
+ // just need to avoid accessing an unmapped page.
+ msgs[i].uncompressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE + 8);
+ if (msgs[i].compressed_chunks[j] == NULL ||
+ msgs[i].uncompressed_chunks[j] == NULL)
+ {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out;
+ }
+ }
+ }
+
+ // This loop is executed until all resources have been written, except
+ // possibly a few that have been added to the @my_resources list for
+ // writing later.
+ while (1) {
+ // Send chunks to the compressor threads until either (a) there
+ // are no more messages available since they were all sent off,
+ // or (b) there are no more resources that need to be
+ // compressed.
+ while (!list_empty(&available_msgs)) {
+ if (next_chunk == next_num_chunks) {
+ // If next_chunk == next_num_chunks, there are
+ // no more chunks to write in the current
+ // stream. So, check the SHA1 message digest of
+ // the stream that was just finished (unless
+ // next_lte == NULL, which is the case the very
+ // first time this loop is entered, and also
+ // near the very end of the compression when
+ // there are no more streams.) Then, advance to
+ // the next stream (if there is one).
+ if (next_lte != NULL) {
+ #ifdef WITH_NTFS_3G
+ end_wim_resource_read(next_lte, ni);
+ ni = NULL;
+ #else
+ end_wim_resource_read(next_lte);
+ #endif
+ DEBUG2("Finalize SHA1 md (next_num_chunks=%zu)",
+ next_num_chunks);
+ sha1_final(next_hash, &next_sha_ctx);
+ if (!hashes_equal(next_lte->hash, next_hash)) {
+ ERROR("WIM resource has incorrect hash!");
+ if (next_lte->resource_location ==
+ RESOURCE_IN_FILE_ON_DISK)
+ {
+ ERROR("We were reading it from `%s'; "
+ "maybe it changed while we were "
+ "reading it.",
+ next_lte->file_on_disk);
+ }
+ ret = WIMLIB_ERR_INVALID_RESOURCE_HASH;
+ goto out;
+ }
+ }
+
+ // Advance to the next resource.
+ //
+ // If the next resource needs no compression, just write
+ // it with this thread (not now though--- we could be in
+ // the middle of writing another resource.) Keep doing
+ // this until we either get to the end of the resources
+ // list, or we get to a resource that needs compression.
+ while (1) {
+ if (next_resource == stream_list) {
+ // No more resources to send for
+ // compression
+ next_lte = NULL;
+ break;
+ }
+ next_lte = container_of(next_resource,
+ struct lookup_table_entry,
+ staging_list);
+ next_resource = next_resource->next;
+ if ((!(write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)
+ && wim_resource_compression_type(next_lte) == out_ctype)
+ || wim_resource_size(next_lte) == 0)
+ {
+ list_add_tail(&next_lte->staging_list,
+ &my_resources);
+ } else {
+ list_add_tail(&next_lte->staging_list,
+ &outstanding_resources);
+ next_chunk = 0;
+ next_num_chunks = wim_resource_chunks(next_lte);
+ sha1_init(&next_sha_ctx);
+ INIT_LIST_HEAD(&next_lte->msg_list);
+ #ifdef WITH_NTFS_3G
+ ret = prepare_resource_for_read(next_lte, &ni);
+ #else
+ ret = prepare_resource_for_read(next_lte);
+ #endif
+
+ if (ret != 0)
+ goto out;
+ if (cur_lte == NULL) {
+ // Set cur_lte for the
+ // first time
+ cur_lte = next_lte;
+ }
+ break;
+ }
+ }
+ }
+
+ if (next_lte == NULL) {
+ // No more resources to send for compression
+ break;
+ }
+
+ // Get a message from the available messages
+ // list
+ msg = container_of(available_msgs.next,
+ struct message,
+ list);
+
+ // ... and delete it from the available messages
+ // list
+ list_del(&msg->list);
+
+ // Initialize the message with the chunks to
+ // compress.
+ msg->num_chunks = min(next_num_chunks - next_chunk,
+ MAX_CHUNKS_PER_MSG);
+ msg->lte = next_lte;
+ msg->complete = false;
+ msg->begin_chunk = next_chunk;
+
+ unsigned size = WIM_CHUNK_SIZE;
+ for (unsigned i = 0; i < msg->num_chunks; i++) {
+
+ // Read chunk @next_chunk of the stream into the
+ // message so that a compressor thread can
+ // compress it.
+
+ if (next_chunk == next_num_chunks - 1) {
+ size = MODULO_NONZERO(wim_resource_size(next_lte),
+ WIM_CHUNK_SIZE);
+ }
+
+ DEBUG2("Read resource (size=%u, offset=%zu)",
+ size, next_chunk * WIM_CHUNK_SIZE);
+
+ msg->uncompressed_chunk_sizes[i] = size;
+
+ ret = read_wim_resource(next_lte,
+ msg->uncompressed_chunks[i],
+ size,
+ next_chunk * WIM_CHUNK_SIZE,
+ 0);
+ if (ret != 0)
+ goto out;
+ sha1_update(&next_sha_ctx,
+ msg->uncompressed_chunks[i], size);
+ next_chunk++;
+ }
+
+ // Send the compression request
+ list_add_tail(&msg->list, &next_lte->msg_list);
+ shared_queue_put(res_to_compress_queue, msg);
+ DEBUG2("Compression request sent");
+ }
+
+ // If there are no outstanding resources, there are no more
+ // resources that need to be written.
+ if (list_empty(&outstanding_resources)) {
+ ret = 0;
+ goto out;
+ }
+
+ // Get the next message from the queue and process it.
+ // The message will contain 1 or more data chunks that have been
+ // compressed.
+ msg = shared_queue_get(compressed_res_queue);
+ msg->complete = true;
+
+ // Is this the next chunk in the current resource? If it's not
+ // (i.e., an earlier chunk in a same or different resource
+ // hasn't been compressed yet), do nothing, and keep this
+ // message around until all earlier chunks are received.
+ //
+ // Otherwise, write all the chunks we can.
+ while (cur_lte != NULL &&
+ !list_empty(&cur_lte->msg_list) &&
+ (msg = container_of(cur_lte->msg_list.next,
+ struct message,
+ list))->complete)
+ {
+ DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
+ if (msg->begin_chunk == 0) {
+ DEBUG2("Begin chunk tab");
+
+ // This is the first set of chunks. Leave space
+ // for the chunk table in the output file.
+ off_t cur_offset = ftello(out_fp);
+ if (cur_offset == -1) {
+ ret = WIMLIB_ERR_WRITE;
+ goto out;
+ }
+ ret = begin_wim_resource_chunk_tab(cur_lte,
+ out_fp,
+ cur_offset,
+ &cur_chunk_tab);
+ if (ret != 0)
+ goto out;
+ }
+
+ // Write the compressed chunks from the message.
+ ret = write_wim_chunks(msg, out_fp, cur_chunk_tab);
+ if (ret != 0)
+ goto out;
+
+ list_del(&msg->list);
+
+ // This message is available to use for different chunks
+ // now.
+ list_add(&msg->list, &available_msgs);
+
+ // Was this the last chunk of the stream? If so, finish
+ // it.
+ if (list_empty(&cur_lte->msg_list) &&
+ msg->begin_chunk + msg->num_chunks == cur_chunk_tab->num_chunks)
+ {
+ DEBUG2("Finish wim chunk tab");
+ u64 res_csize;
+ ret = finish_wim_resource_chunk_tab(cur_chunk_tab,
+ out_fp,
+ &res_csize);
+ if (ret != 0)
+ goto out;
+
+ if (res_csize >= wim_resource_size(cur_lte)) {
+ /* Oops! We compressed the resource to
+ * larger than the original size. Write
+ * the resource uncompressed instead. */
+ ret = write_uncompressed_resource_and_truncate(
+ cur_lte,
+ out_fp,
+ cur_chunk_tab->file_offset,
+ &cur_lte->output_resource_entry);
+ if (ret != 0)
+ goto out;
+ } else {
+ cur_lte->output_resource_entry.size =
+ res_csize;
+
+ cur_lte->output_resource_entry.original_size =
+ cur_lte->resource_entry.original_size;
+
+ cur_lte->output_resource_entry.offset =
+ cur_chunk_tab->file_offset;
+
+ cur_lte->output_resource_entry.flags =
+ cur_lte->resource_entry.flags |
+ WIM_RESHDR_FLAG_COMPRESSED;
+ }
+
+ progress->write_streams.completed_bytes +=
+ wim_resource_size(cur_lte);
+ progress->write_streams.completed_streams++;
+
+ if (progress_func) {
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS,
+ progress);
+ }
+
+ FREE(cur_chunk_tab);
+ cur_chunk_tab = NULL;
+
+ struct list_head *next = cur_lte->staging_list.next;
+ list_del(&cur_lte->staging_list);
+
+ if (next == &outstanding_resources)
+ cur_lte = NULL;
+ else
+ cur_lte = container_of(cur_lte->staging_list.next,
+ struct lookup_table_entry,
+ staging_list);
+
+ // Since we just finished writing a stream,
+ // write any streams that have been added to the
+ // my_resources list for direct writing by the
+ // main thread (e.g. resources that don't need
+ // to be compressed because the desired
+ // compression type is the same as the previous
+ // compression type).
+ ret = do_write_stream_list(&my_resources,
+ out_fp,
+ out_ctype,
+ progress_func,
+ progress,
+ 0);
+ if (ret != 0)
+ goto out;
+ }
+ }
+ }
+
+out:
+ if (ret == WIMLIB_ERR_NOMEM) {
+ ERROR("Could not allocate enough memory for "
+ "multi-threaded compression");
+ }
+
+ if (next_lte) {
+#ifdef WITH_NTFS_3G
+ end_wim_resource_read(next_lte, ni);
+#else
+ end_wim_resource_read(next_lte);
+#endif
+ }
+
+ if (ret == 0) {
+ ret = do_write_stream_list(&my_resources, out_fp,
+ out_ctype, progress_func,
+ progress, 0);
+ } else {
+ if (msgs) {
+ size_t num_available_msgs = 0;
+ struct list_head *cur;
+
+ list_for_each(cur, &available_msgs) {
+ num_available_msgs++;
+ }
+
+ while (num_available_msgs < num_messages) {
+ shared_queue_get(compressed_res_queue);
+ num_available_msgs++;
+ }
+ }
+ }
+
+ if (msgs) {
+ for (size_t i = 0; i < num_messages; i++) {
+ for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
+ FREE(msgs[i].compressed_chunks[j]);
+ FREE(msgs[i].uncompressed_chunks[j]);
+ }
+ }
+ FREE(msgs);
+ }
+
+ FREE(cur_chunk_tab);
+ return ret;
+}
+
+
+static int write_stream_list_parallel(struct list_head *stream_list,
+ FILE *out_fp,
+ int out_ctype,
+ int write_flags,
+ unsigned num_threads,
+ wimlib_progress_func_t progress_func,
+ union wimlib_progress_info *progress)
+{
+ int ret;
+ struct shared_queue res_to_compress_queue;
+ struct shared_queue compressed_res_queue;
+ pthread_t *compressor_threads = NULL;
+
+ if (num_threads == 0) {
+ long nthreads = sysconf(_SC_NPROCESSORS_ONLN);
+ if (nthreads < 1) {
+ WARNING("Could not determine number of processors! Assuming 1");
+ goto out_serial;
+ } else {
+ num_threads = nthreads;
+ }
+ }
+
+ progress->write_streams.num_threads = num_threads;
+ wimlib_assert(stream_list->next != stream_list);
+
+ static const double MESSAGES_PER_THREAD = 2.0;
+ size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD);
+
+ DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
+
+ ret = shared_queue_init(&res_to_compress_queue, queue_size);
+ if (ret != 0)
+ goto out_serial;
+
+ ret = shared_queue_init(&compressed_res_queue, queue_size);
+ if (ret != 0)
+ goto out_destroy_res_to_compress_queue;
+
+ struct compressor_thread_params params;
+ params.res_to_compress_queue = &res_to_compress_queue;
+ params.compressed_res_queue = &compressed_res_queue;
+ params.compress = get_compress_func(out_ctype);
+
+ compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+ if (!compressor_threads) {
+ ret = WIMLIB_ERR_NOMEM;
+ goto out_destroy_compressed_res_queue;
+ }
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ DEBUG("pthread_create thread %u", i);
+ ret = pthread_create(&compressor_threads[i], NULL,
+ compressor_thread_proc, ¶ms);
+ if (ret != 0) {
+ ret = -1;
+ ERROR_WITH_ERRNO("Failed to create compressor "
+ "thread %u", i);
+ num_threads = i;
+ goto out_join;
+ }
+ }
+
+ if (progress_func)
+ progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
+
+ ret = main_writer_thread_proc(stream_list,
+ out_fp,
+ out_ctype,
+ &res_to_compress_queue,
+ &compressed_res_queue,
+ queue_size,
+ write_flags,
+ progress_func,
+ progress);
+out_join:
+ for (unsigned i = 0; i < num_threads; i++)
+ shared_queue_put(&res_to_compress_queue, NULL);
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ if (pthread_join(compressor_threads[i], NULL)) {
+ WARNING("Failed to join compressor thread %u: %s",
+ i, strerror(errno));
+ }
+ }
+ FREE(compressor_threads);
+out_destroy_compressed_res_queue:
+ shared_queue_destroy(&compressed_res_queue);
+out_destroy_res_to_compress_queue:
+ shared_queue_destroy(&res_to_compress_queue);
+ if (ret >= 0 && ret != WIMLIB_ERR_NOMEM)
+ return ret;
+out_serial:
+ WARNING("Falling back to single-threaded compression");
+ return write_stream_list_serial(stream_list,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ progress);
+
+}
+#endif
+
+/*
+ * Write a list of streams to a WIM (@out_fp) using the compression type
+ * @out_ctype and up to @num_threads compressor threads.
+ */
+static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
+ int out_ctype, int write_flags,
+ unsigned num_threads,
+ wimlib_progress_func_t progress_func)
+{
+ struct lookup_table_entry *lte;
+ size_t num_streams = 0;
+ u64 total_bytes = 0;
+ u64 total_compression_bytes = 0;
+ union wimlib_progress_info progress;
+
+ list_for_each_entry(lte, stream_list, staging_list) {
+ num_streams++;
+ total_bytes += wim_resource_size(lte);
+ if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE
+ && (wim_resource_compression_type(lte) != out_ctype ||
+ (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS)))
+ {
+ total_compression_bytes += wim_resource_size(lte);
+ }
+ }
+ progress.write_streams.total_bytes = total_bytes;
+ progress.write_streams.total_streams = num_streams;
+ progress.write_streams.completed_bytes = 0;
+ progress.write_streams.completed_streams = 0;
+ progress.write_streams.num_threads = num_threads;
+ progress.write_streams.compression_type = out_ctype;
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+ if (total_compression_bytes >= 1000000 && num_threads != 1)
+ return write_stream_list_parallel(stream_list,
+ out_fp,
+ out_ctype,
+ write_flags,
+ num_threads,
+ progress_func,
+ &progress);
+ else
+#endif
+ return write_stream_list_serial(stream_list,
+ out_fp,
+ out_ctype,
+ write_flags,
+ progress_func,
+ &progress);
+}
+
+struct lte_overwrite_prepare_args {
+ WIMStruct *wim;
+ off_t end_offset;
+ struct list_head *stream_list;
+};
+
+static int lte_overwrite_prepare(struct lookup_table_entry *lte, void *arg)
+{
+ struct lte_overwrite_prepare_args *args = arg;
+
+ if (lte->resource_location == RESOURCE_IN_WIM &&
+ lte->wim == args->wim &&
+ lte->resource_entry.offset + lte->resource_entry.size > args->end_offset)
+ {
+ ERROR("The following resource is after the XML data:");
+ print_lookup_table_entry(lte);
+ return WIMLIB_ERR_RESOURCE_ORDER;
+ }
+
+ lte->out_refcnt = lte->refcnt;
+ memcpy(<e->output_resource_entry, <e->resource_entry,
+ sizeof(struct resource_entry));
+ if (!(lte->resource_entry.flags & WIM_RESHDR_FLAG_METADATA)) {
+ wimlib_assert(lte->resource_location != RESOURCE_NONEXISTENT);
+ if (lte->resource_location != RESOURCE_IN_WIM || lte->wim != args->wim)
+ list_add(<e->staging_list, args->stream_list);
+ }
+ return 0;
+}
+
+static int wim_find_new_streams(WIMStruct *wim, off_t end_offset,
+ struct list_head *stream_list)
+{
+ struct lte_overwrite_prepare_args args = {
+ .wim = wim,
+ .end_offset = end_offset,
+ .stream_list = stream_list,
+ };
+
+ return for_lookup_table_entry(wim->lookup_table,
+ lte_overwrite_prepare, &args);
+}
+
+static int inode_find_streams_to_write(struct inode *inode,
+ struct lookup_table *table,
+ struct list_head *stream_list)
+{
+ struct lookup_table_entry *lte;
+ for (unsigned i = 0; i <= inode->num_ads; i++) {
+ lte = inode_stream_lte(inode, i, table);
+ if (lte) {
+ if (lte->out_refcnt == 0)
+ list_add_tail(<e->staging_list, stream_list);
+ lte->out_refcnt += inode->link_count;
+ }
+ }
+ return 0;
+}
+
+static int image_find_streams_to_write(WIMStruct *w)
+{
+ struct inode *inode;
+ struct hlist_node *cur;
+ struct hlist_head *inode_list;
+
+ inode_list = &wim_get_current_image_metadata(w)->inode_list;
+ hlist_for_each_entry(inode, cur, inode_list, hlist) {
+ inode_find_streams_to_write(inode, w->lookup_table,
+ (struct list_head*)w->private);
+ }
+ return 0;
+}
+
+static int write_wim_streams(WIMStruct *w, int image, int write_flags,
+ unsigned num_threads,
+ wimlib_progress_func_t progress_func)
+{
+
+ for_lookup_table_entry(w->lookup_table, lte_zero_out_refcnt, NULL);
+ LIST_HEAD(stream_list);
+ w->private = &stream_list;
+ for_image(w, image, image_find_streams_to_write);
+ return write_stream_list(&stream_list, w->out_fp,
+ wimlib_get_compression_type(w), write_flags,
+ num_threads, progress_func);
+}
+
+/*
+ * Finish writing a WIM file: write the lookup table, xml data, and integrity
+ * table (optional), then overwrite the WIM header.
+ *
+ * write_flags is a bitwise OR of the following:
+ *
+ * (public) WIMLIB_WRITE_FLAG_CHECK_INTEGRITY:
+ * Include an integrity table.
+ *
+ * (public) WIMLIB_WRITE_FLAG_SHOW_PROGRESS:
+ * Show progress information when (if) writing the integrity table.
+ *
+ * (private) WIMLIB_WRITE_FLAG_NO_LOOKUP_TABLE:
+ * Don't write the lookup table.
+ *
+ * (private) WIMLIB_WRITE_FLAG_REUSE_INTEGRITY_TABLE:
+ * When (if) writing the integrity table, re-use entries from the
+ * existing integrity table, if possible.
+ *
+ * (private) WIMLIB_WRITE_FLAG_CHECKPOINT_AFTER_XML:
+ * After writing the XML data but before writing the integrity
+ * table, write a temporary WIM header and flush the stream so that
+ * the WIM is less likely to become corrupted upon abrupt program
+ * termination.
+ *
+ * (private) WIMLIB_WRITE_FLAG_FSYNC:
+ * fsync() the output file before closing it.
+ *
+ */
+int finish_write(WIMStruct *w, int image, int write_flags,
+ wimlib_progress_func_t progress_func)
+{
+ int ret;
+ struct wim_header hdr;
+ FILE *out = w->out_fp;