#include "lzx.h"
#include "xpress.h"
#include <unistd.h>
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
#include <semaphore.h>
#include <pthread.h>
#include <errno.h>
+#endif
#ifdef WITH_NTFS_3G
#include <time.h>
/*
* Writes a WIM file to the original file that it was read from, overwriting it.
*/
-WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
+WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags,
+ unsigned num_threads)
{
const char *wimfile_name;
size_t wim_name_len;
randomize_char_array_with_alnum(tmpfile + wim_name_len, 9);
tmpfile[wim_name_len + 9] = '\0';
- ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES, write_flags);
+ ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES, write_flags,
+ num_threads);
if (ret != 0) {
ERROR("Failed to write the WIM file `%s'", tmpfile);
if (unlink(tmpfile) != 0)
return WIMLIB_ERR_RENAME;
}
- if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE)
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
printf("Successfully renamed `%s' to `%s'\n", tmpfile, wimfile_name);
return 0;
}
static int prepare_resource_for_read(struct lookup_table_entry *lte
-
+
#ifdef WITH_NTFS_3G
, ntfs_inode **ni_ret
#endif
}
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
struct shared_queue {
sem_t filled_slots;
sem_t empty_slots;
}
DEBUG("Compressor thread terminating");
}
+#endif
+
+static void show_stream_write_progress(u64 *cur_size, u64 *next_size,
+ u64 total_size, u64 one_percent,
+ unsigned *cur_percent,
+ const struct lookup_table_entry *cur_lte)
+{
+ if (*cur_size >= *next_size) {
+ printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+ "(uncompressed) written (%u%% done)",
+ *cur_size >> 20,
+ total_size >> 20, *cur_percent);
+ fflush(stdout);
+ *next_size += one_percent;
+ (*cur_percent)++;
+ }
+ *cur_size += wim_resource_size(cur_lte);
+}
+
+static void finish_stream_write_progress(u64 total_size)
+{
+ printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+ "(uncompressed) written (100%% done)\n",
+ total_size >> 20, total_size >> 20);
+ fflush(stdout);
+}
static int write_stream_list_serial(struct list_head *stream_list,
FILE *out_fp, int out_ctype,
- int write_flags)
+ int write_flags, u64 total_size)
{
struct lookup_table_entry *lte;
int ret;
+ u64 one_percent = total_size / 100;
+ u64 cur_size = 0;
+ u64 next_size = 0;
+ unsigned cur_percent = 0;
+
list_for_each_entry(lte, stream_list, staging_list) {
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ show_stream_write_progress(&cur_size, &next_size,
+ total_size, one_percent,
+ &cur_percent, lte);
+ }
ret = write_wim_resource(lte, out_fp, out_ctype,
<e->output_resource_entry, 0);
if (ret != 0)
return ret;
}
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+ finish_stream_write_progress(total_size);
return 0;
}
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
static int write_wim_chunks(struct message *msg, FILE *out_fp,
struct chunk_table *chunk_tab)
{
return 0;
}
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel. The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data. The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
static int main_writer_thread_proc(struct list_head *stream_list,
FILE *out_fp,
int out_ctype,
struct shared_queue *res_to_compress_queue,
struct shared_queue *compressed_res_queue,
- size_t queue_size)
+ size_t queue_size,
+ int write_flags,
+ u64 total_size)
{
int ret;
struct message msgs[queue_size];
ZERO_ARRAY(msgs);
+ // Initially, all the messages are available to use.
LIST_HEAD(available_msgs);
-
for (size_t i = 0; i < ARRAY_LEN(msgs); i++)
list_add(&msgs[i].list, &available_msgs);
-
+ // outstanding_resources is the list of resources that currently have
+ // had chunks sent off for compression.
+ //
+ // The first stream in outstanding_resources is the stream that is
+ // currently being written (cur_lte).
+ //
+ // The last stream in outstanding_resources is the stream that is
+ // currently being read and chunks fed to the compressor threads
+ // (next_lte).
+ //
+ // Depending on the number of threads and the sizes of the resource,
+ // the outstanding streams list may contain streams between cur_lte and
+ // next_lte that have all their chunks compressed or being compressed,
+ // but haven't been written yet.
+ //
+ LIST_HEAD(outstanding_resources);
struct list_head *next_resource = stream_list->next;
struct lookup_table_entry *next_lte = container_of(next_resource,
struct lookup_table_entry,
u64 next_chunk = 0;
u64 next_num_chunks = wim_resource_chunks(next_lte);
INIT_LIST_HEAD(&next_lte->msg_list);
+ list_add_tail(&next_lte->staging_list, &outstanding_resources);
+
+ // As in write_wim_resource(), each resource we read is checksummed.
SHA_CTX next_sha_ctx;
sha1_init(&next_sha_ctx);
-
u8 next_hash[SHA1_HASH_SIZE];
- // Resources owning chunks that have been sent off for compression
- LIST_HEAD(outstanding_resources);
-
- // Resources that are going to be written by the main thread
+ // Resources that don't need any chunks compressed are added to this
+ // list and written directly by the main thread.
LIST_HEAD(my_resources);
- list_add_tail(&next_lte->staging_list, &outstanding_resources);
-
struct lookup_table_entry *cur_lte = next_lte;
struct chunk_table *cur_chunk_tab = NULL;
struct lookup_table_entry *lte;
struct message *msg;
+ u64 one_percent = total_size / 100;
+ u64 cur_size = 0;
+ u64 next_size = 0;
+ unsigned cur_percent = 0;
+
#ifdef WITH_NTFS_3G
ntfs_inode *ni = NULL;
#endif
"and compressed data (%zu bytes needed)",
queue_size * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2);
+ // Pre-allocate all the buffers that will be needed to do the chunk
+ // compression.
for (size_t i = 0; i < ARRAY_LEN(msgs); i++) {
for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE);
// compressed.
while (!list_empty(&available_msgs) && next_lte != NULL) {
- wimlib_assert(next_chunk < next_num_chunks);
-
// Get a message from the available messages
// list
msg = container_of(available_msgs.next,
unsigned size = WIM_CHUNK_SIZE;
for (unsigned i = 0; i < msg->num_chunks; i++) {
+
+ // Read chunk @next_chunk of the stream into the
+ // message so that a compressor thread can
+ // compress it.
+
if (next_chunk == next_num_chunks - 1 &&
wim_resource_size(next_lte) % WIM_CHUNK_SIZE != 0)
{
size = wim_resource_size(next_lte) % WIM_CHUNK_SIZE;
}
+
DEBUG2("Read resource (size=%u, offset=%zu)",
size, next_chunk * WIM_CHUNK_SIZE);
// Advance to the next resource.
//
- // If the next resource needs no compression,
- // just write it with this thread (not now
- // though--- we could be in the middle of
- // writing another resource.) Keep doing this
- // until we either get to the end of the
- // resources list, or we get to a resource that
- // needs compression.
+ // If the next resource needs no compression, just write
+ // it with this thread (not now though--- we could be in
+ // the middle of writing another resource.) Keep doing
+ // this until we either get to the end of the resources
+ // list, or we get to a resource that needs compression.
while (1) {
if (next_resource == stream_list) {
}
// If there are no outstanding resources, there are no more
- // resources to compress.
+ // resources that need to be written.
if (list_empty(&outstanding_resources)) {
DEBUG("No outstanding resources! Done");
ret = 0;
DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
if (msg->begin_chunk == 0) {
DEBUG2("Begin chunk tab");
- // This is the first set of chunks.
- // Leave space for the chunk table in
- // the output file.
+
+
+
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ show_stream_write_progress(&cur_size,
+ &next_size,
+ total_size,
+ one_percent,
+ &cur_percent,
+ cur_lte);
+ }
+
+ // This is the first set of chunks. Leave space
+ // for the chunk table in the output file.
off_t cur_offset = ftello(out_fp);
if (cur_offset == -1) {
ret = WIMLIB_ERR_WRITE;
goto out;
}
- FREE(cur_chunk_tab);
- cur_chunk_tab = NULL;
ret = begin_wim_resource_chunk_tab(cur_lte,
out_fp,
cur_offset,
if (ret != 0)
goto out;
}
+
+ // Write the compressed chunks from the message.
ret = write_wim_chunks(msg, out_fp, cur_chunk_tab);
if (ret != 0)
goto out;
list_del(&msg->list);
+
+ // This message is available to use for different chunks
+ // now.
list_add(&msg->list, &available_msgs);
+ // Was this the last chunk of the stream? If so,
+ // finish it.
if (list_empty(&cur_lte->msg_list) &&
msg->begin_chunk + msg->num_chunks == cur_chunk_tab->num_chunks)
{
out_fp,
&res_csize);
if (ret != 0)
- return ret;
+ goto out;
+
cur_lte->output_resource_entry.size =
res_csize;
cur_lte->resource_entry.flags |
WIM_RESHDR_FLAG_COMPRESSED;
+ FREE(cur_chunk_tab);
+ cur_chunk_tab = NULL;
+
struct list_head *next = cur_lte->staging_list.next;
list_del(&cur_lte->staging_list);
staging_list);
}
+ // Since we just finished writing a stream,
+ // write any streams that have been added to the
+ // my_resources list for direct writing by the
+ // main thread (e.g. resources that don't need
+ // to be compressed because the desired
+ // compression type is the same as the previous
+ // compression type).
struct lookup_table_entry *tmp;
list_for_each_entry_safe(lte,
tmp,
&my_resources,
staging_list)
{
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ show_stream_write_progress(&cur_size,
+ &next_size,
+ total_size,
+ one_percent,
+ &cur_percent,
+ lte);
+ }
+
ret = write_wim_resource(lte,
out_fp,
out_ctype,
0);
if (ret != 0)
break;
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ show_stream_write_progress(&cur_size,
+ &next_size,
+ total_size,
+ one_percent,
+ &cur_percent,
+ lte);
+ }
}
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+ finish_stream_write_progress(total_size);
} else {
size_t num_available_msgs = 0;
struct list_head *cur;
}
while (num_available_msgs < ARRAY_LEN(msgs)) {
- msg = shared_queue_get(compressed_res_queue);
+ shared_queue_get(compressed_res_queue);
num_available_msgs++;
}
}
static int write_stream_list_parallel(struct list_head *stream_list,
FILE *out_fp, int out_ctype,
- int write_flags)
+ int write_flags, u64 total_size,
+ unsigned num_threads)
{
int ret;
- long nthreads;
struct shared_queue res_to_compress_queue;
struct shared_queue compressed_res_queue;
+ pthread_t *compressor_threads = NULL;
- nthreads = sysconf(_SC_NPROCESSORS_ONLN);
- if (nthreads < 1) {
- WARNING("Could not determine number of processors! Assuming 1");
- goto out_serial;
+ if (num_threads == 0) {
+ long nthreads = sysconf(_SC_NPROCESSORS_ONLN);
+ if (nthreads < 1) {
+ WARNING("Could not determine number of processors! Assuming 1");
+ goto out_serial;
+ } else {
+ num_threads = nthreads;
+ }
}
wimlib_assert(stream_list->next != stream_list);
- {
- pthread_t compressor_threads[nthreads];
- static const double MESSAGES_PER_THREAD = 2.0;
- size_t queue_size = (size_t)(nthreads * MESSAGES_PER_THREAD);
+ static const double MESSAGES_PER_THREAD = 2.0;
+ size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD);
- DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
+ DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
- ret = shared_queue_init(&res_to_compress_queue, queue_size);
- if (ret != 0)
- goto out_serial;
+ ret = shared_queue_init(&res_to_compress_queue, queue_size);
+ if (ret != 0)
+ goto out_serial;
- ret = shared_queue_init(&compressed_res_queue, queue_size);
- if (ret != 0)
- goto out_destroy_res_to_compress_queue;
-
-
- struct compressor_thread_params params;
- params.res_to_compress_queue = &res_to_compress_queue;
- params.compressed_res_queue = &compressed_res_queue;
- params.compress = get_compress_func(out_ctype);
-
- for (long i = 0; i < nthreads; i++) {
- DEBUG("pthread_create thread %ld", i);
- ret = pthread_create(&compressor_threads[i], NULL,
- compressor_thread_proc, ¶ms);
- if (ret != 0) {
- ERROR_WITH_ERRNO("Failed to create compressor "
- "thread %ld", i);
- nthreads = i;
- goto out_join;
- }
+ ret = shared_queue_init(&compressed_res_queue, queue_size);
+ if (ret != 0)
+ goto out_destroy_res_to_compress_queue;
+
+ struct compressor_thread_params params;
+ params.res_to_compress_queue = &res_to_compress_queue;
+ params.compressed_res_queue = &compressed_res_queue;
+ params.compress = get_compress_func(out_ctype);
+
+ compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ DEBUG("pthread_create thread %u", i);
+ ret = pthread_create(&compressor_threads[i], NULL,
+ compressor_thread_proc, ¶ms);
+ if (ret != 0) {
+ ret = -1;
+ ERROR_WITH_ERRNO("Failed to create compressor "
+ "thread %u", i);
+ num_threads = i;
+ goto out_join;
}
+ }
- if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
- printf("Writing compressed data using %ld threads...\n",
- nthreads);
- }
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ printf("Writing compressed data using %u threads...\n",
+ num_threads);
+ }
- ret = main_writer_thread_proc(stream_list,
- out_fp,
- out_ctype,
- &res_to_compress_queue,
- &compressed_res_queue,
- queue_size);
-
- out_join:
- for (long i = 0; i < nthreads; i++)
- shared_queue_put(&res_to_compress_queue, NULL);
-
- for (long i = 0; i < nthreads; i++) {
- if (pthread_join(compressor_threads[i], NULL)) {
- WARNING("Failed to join compressor thread %ld: %s",
- i, strerror(errno));
- }
+ ret = main_writer_thread_proc(stream_list,
+ out_fp,
+ out_ctype,
+ &res_to_compress_queue,
+ &compressed_res_queue,
+ queue_size,
+ write_flags,
+ total_size);
+
+out_join:
+ for (unsigned i = 0; i < num_threads; i++)
+ shared_queue_put(&res_to_compress_queue, NULL);
+
+ for (unsigned i = 0; i < num_threads; i++) {
+ if (pthread_join(compressor_threads[i], NULL)) {
+ WARNING("Failed to join compressor thread %u: %s",
+ i, strerror(errno));
}
}
+ FREE(compressor_threads);
shared_queue_destroy(&compressed_res_queue);
out_destroy_res_to_compress_queue:
shared_queue_destroy(&res_to_compress_queue);
out_serial:
WARNING("Falling back to single-threaded compression");
return write_stream_list_serial(stream_list, out_fp,
- out_ctype, write_flags);
+ out_ctype, write_flags, total_size);
}
+#endif
static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
- int out_ctype, int write_flags)
+ int out_ctype, int write_flags,
+ unsigned num_threads)
{
struct lookup_table_entry *lte;
size_t num_streams = 0;
wimlib_get_compression_type_string(out_ctype));
}
- if (compression_needed && total_size >= 0) { // XXX
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+ if (compression_needed && total_size >= 1000000 && num_threads != 1) {
return write_stream_list_parallel(stream_list, out_fp,
- out_ctype, write_flags);
- } else {
- if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
- puts("Using 1 thread (no compression needed)");
+ out_ctype, write_flags,
+ total_size, num_threads);
+ }
+ else
+#endif
+ {
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+ const char *reason = "";
+ if (!compression_needed)
+ reason = " (no compression needed)";
+ printf("Writing data using 1 thread%s\n", reason);
}
return write_stream_list_serial(stream_list, out_fp,
- out_ctype, write_flags);
+ out_ctype, write_flags,
+ total_size);
}
}
dentry_find_streams_to_write, w);
}
-static int write_wim_streams(WIMStruct *w, int image, int write_flags)
+static int write_wim_streams(WIMStruct *w, int image, int write_flags,
+ unsigned num_threads)
{
LIST_HEAD(stream_list);
w->private = &stream_list;
for_image(w, image, find_streams_to_write);
return write_stream_list(&stream_list, w->out_fp,
- wimlib_get_compression_type(w), write_flags);
+ wimlib_get_compression_type(w), write_flags,
+ num_threads);
}
/*
/* Writes a stand-alone WIM to a file. */
WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
- int image, int write_flags)
+ int image, int write_flags, unsigned num_threads)
{
int ret;
for_lookup_table_entry(w->lookup_table, lte_zero_out_refcnt, NULL);
- ret = write_wim_streams(w, image, write_flags);
+ ret = write_wim_streams(w, image, write_flags, num_threads);
if (ret != 0) {
/*ERROR("Failed to write WIM file resources to `%s'", path);*/
return ret;
}
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+ printf("Writing image metadata...\n");
+
ret = for_image(w, image, write_metadata_resource);
if (ret != 0) {
if (ret != 0)
return ret;
- if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE)
+ if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
printf("Successfully wrote `%s'\n", path);
return 0;
}