]> wimlib.net Git - wimlib/blobdiff - src/write.c
--threads option and stream writing progress
[wimlib] / src / write.c
index 56fbab0ea1c08e5b33c7671e01f768c257821962..a84c16b0c91e560ded0b13f6ea95b812899e9e70 100644 (file)
@@ -73,7 +73,8 @@ static int reopen_rw(WIMStruct *w)
 /*
  * Writes a WIM file to the original file that it was read from, overwriting it.
  */
 /*
  * Writes a WIM file to the original file that it was read from, overwriting it.
  */
-WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
+WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags,
+                              unsigned num_threads)
 {
        const char *wimfile_name;
        size_t wim_name_len;
 {
        const char *wimfile_name;
        size_t wim_name_len;
@@ -99,7 +100,8 @@ WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
        randomize_char_array_with_alnum(tmpfile + wim_name_len, 9);
        tmpfile[wim_name_len + 9] = '\0';
 
        randomize_char_array_with_alnum(tmpfile + wim_name_len, 9);
        tmpfile[wim_name_len + 9] = '\0';
 
-       ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES, write_flags);
+       ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES, write_flags,
+                          num_threads);
        if (ret != 0) {
                ERROR("Failed to write the WIM file `%s'", tmpfile);
                if (unlink(tmpfile) != 0)
        if (ret != 0) {
                ERROR("Failed to write the WIM file `%s'", tmpfile);
                if (unlink(tmpfile) != 0)
@@ -902,19 +904,56 @@ static void *compressor_thread_proc(void *arg)
        DEBUG("Compressor thread terminating");
 }
 
        DEBUG("Compressor thread terminating");
 }
 
+static void show_stream_write_progress(u64 *cur_size, u64 *next_size,
+                                      u64 total_size, u64 one_percent,
+                                      unsigned *cur_percent,
+                                      const struct lookup_table_entry *cur_lte)
+{
+       if (*cur_size >= *next_size) {
+               printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+                      "(uncompressed) written (%u%% done)",
+                      *cur_size >> 20,
+                      total_size >> 20, *cur_percent);
+               fflush(stdout);
+               *next_size += one_percent;
+               (*cur_percent)++;
+       }
+       *cur_size += wim_resource_size(cur_lte);
+}
+
+static void finish_stream_write_progress(u64 total_size)
+{
+       printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+              "(uncompressed) written (100%% done)\n",
+              total_size >> 20, total_size >> 20);
+       fflush(stdout);
+}
+
 static int write_stream_list_serial(struct list_head *stream_list,
                                    FILE *out_fp, int out_ctype,
 static int write_stream_list_serial(struct list_head *stream_list,
                                    FILE *out_fp, int out_ctype,
-                                   int write_flags)
+                                   int write_flags, u64 total_size)
 {
        struct lookup_table_entry *lte;
        int ret;
 
 {
        struct lookup_table_entry *lte;
        int ret;
 
+       u64 one_percent = total_size / 100;
+       u64 cur_size = 0;
+       u64 next_size = 0;
+       unsigned cur_percent = 0;
+
        list_for_each_entry(lte, stream_list, staging_list) {
        list_for_each_entry(lte, stream_list, staging_list) {
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                       show_stream_write_progress(&cur_size, &next_size,
+                                                  total_size, one_percent,
+                                                  &cur_percent, lte);
+               }
                ret = write_wim_resource(lte, out_fp, out_ctype,
                                         &lte->output_resource_entry, 0);
                if (ret != 0)
                        return ret;
        }
                ret = write_wim_resource(lte, out_fp, out_ctype,
                                         &lte->output_resource_entry, 0);
                if (ret != 0)
                        return ret;
        }
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+               finish_stream_write_progress(total_size);
        return 0;
 }
 
        return 0;
 }
 
@@ -940,6 +979,7 @@ static int write_wim_chunks(struct message *msg, FILE *out_fp,
        return 0;
 }
 
        return 0;
 }
 
+
 /*
  * This function is executed by the main thread when the resources are being
  * compressed in parallel.  The main thread is in change of all reading of the
 /*
  * This function is executed by the main thread when the resources are being
  * compressed in parallel.  The main thread is in change of all reading of the
@@ -957,7 +997,9 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                   int out_ctype,
                                   struct shared_queue *res_to_compress_queue,
                                   struct shared_queue *compressed_res_queue,
                                   int out_ctype,
                                   struct shared_queue *res_to_compress_queue,
                                   struct shared_queue *compressed_res_queue,
-                                  size_t queue_size)
+                                  size_t queue_size,
+                                  int write_flags,
+                                  u64 total_size)
 {
        int ret;
 
 {
        int ret;
 
@@ -1010,6 +1052,11 @@ static int main_writer_thread_proc(struct list_head *stream_list,
        struct lookup_table_entry *lte;
        struct message *msg;
 
        struct lookup_table_entry *lte;
        struct message *msg;
 
+       u64 one_percent = total_size / 100;
+       u64 cur_size = 0;
+       u64 next_size = 0;
+       unsigned cur_percent = 0;
+
 #ifdef WITH_NTFS_3G
        ntfs_inode *ni = NULL;
 #endif
 #ifdef WITH_NTFS_3G
        ntfs_inode *ni = NULL;
 #endif
@@ -1206,6 +1253,18 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                        DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
                        if (msg->begin_chunk == 0) {
                                DEBUG2("Begin chunk tab");
                        DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
                        if (msg->begin_chunk == 0) {
                                DEBUG2("Begin chunk tab");
+
+
+
+                               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                                       show_stream_write_progress(&cur_size,
+                                                                  &next_size,
+                                                                  total_size,
+                                                                  one_percent,
+                                                                  &cur_percent,
+                                                                  cur_lte);
+                               }
+
                                // This is the first set of chunks.  Leave space
                                // for the chunk table in the output file.
                                off_t cur_offset = ftello(out_fp);
                                // This is the first set of chunks.  Leave space
                                // for the chunk table in the output file.
                                off_t cur_offset = ftello(out_fp);
@@ -1288,6 +1347,15 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                                         &my_resources,
                                                         staging_list)
                                {
                                                         &my_resources,
                                                         staging_list)
                                {
+                                       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                                               show_stream_write_progress(&cur_size,
+                                                                          &next_size,
+                                                                          total_size,
+                                                                          one_percent,
+                                                                          &cur_percent,
+                                                                          lte);
+                                       }
+
                                        ret = write_wim_resource(lte,
                                                                 out_fp,
                                                                 out_ctype,
                                        ret = write_wim_resource(lte,
                                                                 out_fp,
                                                                 out_ctype,
@@ -1315,7 +1383,17 @@ out:
                                                 0);
                        if (ret != 0)
                                break;
                                                 0);
                        if (ret != 0)
                                break;
+                       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                               show_stream_write_progress(&cur_size,
+                                                          &next_size,
+                                                          total_size,
+                                                          one_percent,
+                                                          &cur_percent,
+                                                          lte);
+                       }
                }
                }
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+                       finish_stream_write_progress(total_size);
        } else {
                size_t num_available_msgs = 0;
                struct list_head *cur;
        } else {
                size_t num_available_msgs = 0;
                struct list_head *cur;
@@ -1346,26 +1424,30 @@ out:
 
 static int write_stream_list_parallel(struct list_head *stream_list,
                                      FILE *out_fp, int out_ctype,
 
 static int write_stream_list_parallel(struct list_head *stream_list,
                                      FILE *out_fp, int out_ctype,
-                                     int write_flags)
+                                     int write_flags, u64 total_size,
+                                     unsigned num_threads)
 {
        int ret;
 {
        int ret;
-       long nthreads;
        struct shared_queue res_to_compress_queue;
        struct shared_queue compressed_res_queue;
 
        struct shared_queue res_to_compress_queue;
        struct shared_queue compressed_res_queue;
 
-       nthreads = sysconf(_SC_NPROCESSORS_ONLN);
-       if (nthreads < 1) {
-               WARNING("Could not determine number of processors! Assuming 1");
-               goto out_serial;
+       if (num_threads == 0) {
+               long nthreads = sysconf(_SC_NPROCESSORS_ONLN);
+               if (nthreads < 1) {
+                       WARNING("Could not determine number of processors! Assuming 1");
+                       goto out_serial;
+               } else {
+                       num_threads = nthreads;
+               }
        }
 
        wimlib_assert(stream_list->next != stream_list);
 
        {
        }
 
        wimlib_assert(stream_list->next != stream_list);
 
        {
-               pthread_t compressor_threads[nthreads];
+               pthread_t compressor_threads[num_threads];
 
                static const double MESSAGES_PER_THREAD = 2.0;
 
                static const double MESSAGES_PER_THREAD = 2.0;
-               size_t queue_size = (size_t)(nthreads * MESSAGES_PER_THREAD);
+               size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD);
 
                DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
 
 
                DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
 
@@ -1382,21 +1464,21 @@ static int write_stream_list_parallel(struct list_head *stream_list,
                params.compressed_res_queue = &compressed_res_queue;
                params.compress = get_compress_func(out_ctype);
 
                params.compressed_res_queue = &compressed_res_queue;
                params.compress = get_compress_func(out_ctype);
 
-               for (long i = 0; i < nthreads; i++) {
-                       DEBUG("pthread_create thread %ld", i);
+               for (unsigned i = 0; i < num_threads; i++) {
+                       DEBUG("pthread_create thread %u", i);
                        ret = pthread_create(&compressor_threads[i], NULL,
                                             compressor_thread_proc, &params);
                        if (ret != 0) {
                                ERROR_WITH_ERRNO("Failed to create compressor "
                        ret = pthread_create(&compressor_threads[i], NULL,
                                             compressor_thread_proc, &params);
                        if (ret != 0) {
                                ERROR_WITH_ERRNO("Failed to create compressor "
-                                                "thread %ld", i);
-                               nthreads = i;
+                                                "thread %u", i);
+                               num_threads = i;
                                goto out_join;
                        }
                }
 
                                goto out_join;
                        }
                }
 
-               if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
-                       printf("Writing compressed data using %ld threads...\n",
-                              nthreads);
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                       printf("Writing compressed data using %u threads...\n",
+                              num_threads);
                }
 
                ret = main_writer_thread_proc(stream_list,
                }
 
                ret = main_writer_thread_proc(stream_list,
@@ -1404,15 +1486,17 @@ static int write_stream_list_parallel(struct list_head *stream_list,
                                              out_ctype,
                                              &res_to_compress_queue,
                                              &compressed_res_queue,
                                              out_ctype,
                                              &res_to_compress_queue,
                                              &compressed_res_queue,
-                                             queue_size);
+                                             queue_size,
+                                             write_flags,
+                                             total_size);
 
        out_join:
 
        out_join:
-               for (long i = 0; i < nthreads; i++)
+               for (unsigned i = 0; i < num_threads; i++)
                        shared_queue_put(&res_to_compress_queue, NULL);
 
                        shared_queue_put(&res_to_compress_queue, NULL);
 
-               for (long i = 0; i < nthreads; i++) {
+               for (unsigned i = 0; i < num_threads; i++) {
                        if (pthread_join(compressor_threads[i], NULL)) {
                        if (pthread_join(compressor_threads[i], NULL)) {
-                               WARNING("Failed to join compressor thread %ld: %s",
+                               WARNING("Failed to join compressor thread %u: %s",
                                        i, strerror(errno));
                        }
                }
                                        i, strerror(errno));
                        }
                }
@@ -1425,11 +1509,12 @@ out_destroy_res_to_compress_queue:
 out_serial:
        WARNING("Falling back to single-threaded compression");
        return write_stream_list_serial(stream_list, out_fp,
 out_serial:
        WARNING("Falling back to single-threaded compression");
        return write_stream_list_serial(stream_list, out_fp,
-                                       out_ctype, write_flags);
+                                       out_ctype, write_flags, total_size);
 }
 
 static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
 }
 
 static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
-                            int out_ctype, int write_flags)
+                            int out_ctype, int write_flags,
+                            unsigned num_threads)
 {
        struct lookup_table_entry *lte;
        size_t num_streams = 0;
 {
        struct lookup_table_entry *lte;
        size_t num_streams = 0;
@@ -1455,16 +1540,21 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
                       wimlib_get_compression_type_string(out_ctype));
        }
 
                       wimlib_get_compression_type_string(out_ctype));
        }
 
-       if (compression_needed && total_size >= 1000000) {
+       if (compression_needed && total_size >= 1000000 && num_threads != 1) {
                return write_stream_list_parallel(stream_list, out_fp,
                return write_stream_list_parallel(stream_list, out_fp,
-                                                 out_ctype, write_flags);
+                                                 out_ctype, write_flags,
+                                                 total_size, num_threads);
        } else {
        } else {
-               if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
-                       puts("Using 1 thread (no compression needed)");
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                       const char *reason = "";
+                       if (num_threads != 1)
+                               reason = " (no compression needed)";
+                       printf("Writing data using 1 thread%s\n", reason);
                }
 
                return write_stream_list_serial(stream_list, out_fp,
                }
 
                return write_stream_list_serial(stream_list, out_fp,
-                                               out_ctype, write_flags);
+                                               out_ctype, write_flags,
+                                               total_size);
        }
 }
 
        }
 }
 
@@ -1489,7 +1579,8 @@ static int find_streams_to_write(WIMStruct *w)
                                  dentry_find_streams_to_write, w);
 }
 
                                  dentry_find_streams_to_write, w);
 }
 
-static int write_wim_streams(WIMStruct *w, int image, int write_flags)
+static int write_wim_streams(WIMStruct *w, int image, int write_flags,
+                            unsigned num_threads)
 {
 
        LIST_HEAD(stream_list);
 {
 
        LIST_HEAD(stream_list);
@@ -1497,7 +1588,8 @@ static int write_wim_streams(WIMStruct *w, int image, int write_flags)
        w->private = &stream_list;
        for_image(w, image, find_streams_to_write);
        return write_stream_list(&stream_list, w->out_fp,
        w->private = &stream_list;
        for_image(w, image, find_streams_to_write);
        return write_stream_list(&stream_list, w->out_fp,
-                                wimlib_get_compression_type(w), write_flags);
+                                wimlib_get_compression_type(w), write_flags,
+                                num_threads);
 }
 
 /*
 }
 
 /*
@@ -1662,7 +1754,7 @@ int begin_write(WIMStruct *w, const char *path, int write_flags)
 
 /* Writes a stand-alone WIM to a file.  */
 WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
 
 /* Writes a stand-alone WIM to a file.  */
 WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
-                          int image, int write_flags)
+                          int image, int write_flags, unsigned num_threads)
 {
        int ret;
 
 {
        int ret;
 
@@ -1692,7 +1784,7 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
 
        for_lookup_table_entry(w->lookup_table, lte_zero_out_refcnt, NULL);
 
 
        for_lookup_table_entry(w->lookup_table, lte_zero_out_refcnt, NULL);
 
-       ret = write_wim_streams(w, image, write_flags);
+       ret = write_wim_streams(w, image, write_flags, num_threads);
 
        if (ret != 0) {
                /*ERROR("Failed to write WIM file resources to `%s'", path);*/
 
        if (ret != 0) {
                /*ERROR("Failed to write WIM file resources to `%s'", path);*/