]> wimlib.net Git - wimlib/blobdiff - src/write.c
Set WIM_RESHDR_FLAG_METADATA on XML data
[wimlib] / src / write.c
index 56fbab0ea1c08e5b33c7671e01f768c257821962..bc3d93c2a0681a49e9a13ebb9cec34175895b33b 100644 (file)
 #include "lzx.h"
 #include "xpress.h"
 #include <unistd.h>
+
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
 #include <semaphore.h>
 #include <pthread.h>
 #include <errno.h>
+#endif
 
 #ifdef WITH_NTFS_3G
 #include <time.h>
 
 #ifdef HAVE_ALLOCA_H
 #include <alloca.h>
+#else
+#include <stdlib.h>
 #endif
 
-
 /* Reopens the FILE* for a WIM read-write. */
 static int reopen_rw(WIMStruct *w)
 {
@@ -73,9 +77,9 @@ static int reopen_rw(WIMStruct *w)
 /*
  * Writes a WIM file to the original file that it was read from, overwriting it.
  */
-WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
+WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags,
+                              unsigned num_threads)
 {
-       const char *wimfile_name;
        size_t wim_name_len;
        int ret;
 
@@ -83,23 +87,22 @@ WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
                return WIMLIB_ERR_INVALID_PARAM;
 
        write_flags &= ~WIMLIB_WRITE_FLAG_NO_LOOKUP_TABLE;
-
-       wimfile_name = w->filename;
-
-       DEBUG("Replacing WIM file `%s'.", wimfile_name);
-
-       if (!wimfile_name)
+       if (!w->filename)
                return WIMLIB_ERR_NO_FILENAME;
 
+       DEBUG("Replacing WIM file `%s'.", w->filename);
+
        /* Write the WIM to a temporary file. */
        /* XXX should the temporary file be somewhere else? */
-       wim_name_len = strlen(wimfile_name);
+       wim_name_len = strlen(w->filename);
        char tmpfile[wim_name_len + 10];
-       memcpy(tmpfile, wimfile_name, wim_name_len);
+       memcpy(tmpfile, w->filename, wim_name_len);
        randomize_char_array_with_alnum(tmpfile + wim_name_len, 9);
        tmpfile[wim_name_len + 9] = '\0';
 
-       ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES, write_flags);
+       ret = wimlib_write(w, tmpfile, WIM_ALL_IMAGES,
+                          write_flags | WIMLIB_WRITE_FLAG_FSYNC,
+                          num_threads);
        if (ret != 0) {
                ERROR("Failed to write the WIM file `%s'", tmpfile);
                if (unlink(tmpfile) != 0)
@@ -111,27 +114,30 @@ WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags)
        /* Close the original WIM file that was opened for reading. */
        if (w->fp) {
                if (fclose(w->fp) != 0) {
-                       WARNING("Failed to close the file `%s'", wimfile_name);
+                       WARNING("Failed to close the file `%s'", w->filename);
                }
                w->fp = NULL;
        }
 
-       DEBUG("Renaming `%s' to `%s'", tmpfile, wimfile_name);
+       DEBUG("Renaming `%s' to `%s'", tmpfile, w->filename);
 
        /* Rename the new file to the old file .*/
-       if (rename(tmpfile, wimfile_name) != 0) {
+       if (rename(tmpfile, w->filename) != 0) {
                ERROR_WITH_ERRNO("Failed to rename `%s' to `%s'",
-                                tmpfile, wimfile_name);
-               /* Remove temporary file. */
-               if (unlink(tmpfile) != 0)
-                       ERROR_WITH_ERRNO("Failed to remove `%s'", tmpfile);
-               return WIMLIB_ERR_RENAME;
+                                tmpfile, w->filename);
+               ret = WIMLIB_ERR_RENAME;
+               goto err;
        }
 
-       if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE)
-               printf("Successfully renamed `%s' to `%s'\n", tmpfile, wimfile_name);
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+               printf("Successfully renamed `%s' to `%s'\n", tmpfile, w->filename);
 
        return 0;
+err:
+       /* Remove temporary file. */
+       if (unlink(tmpfile) != 0)
+               ERROR_WITH_ERRNO("Failed to remove `%s'", tmpfile);
+       return ret;
 }
 
 static int check_resource_offset(struct lookup_table_entry *lte, void *arg)
@@ -782,6 +788,7 @@ out:
 }
 
 
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
 struct shared_queue {
        sem_t filled_slots;
        sem_t empty_slots;
@@ -901,23 +908,62 @@ static void *compressor_thread_proc(void *arg)
        }
        DEBUG("Compressor thread terminating");
 }
+#endif
+
+static void show_stream_write_progress(u64 *cur_size, u64 *next_size,
+                                      u64 total_size, u64 one_percent,
+                                      unsigned *cur_percent,
+                                      const struct lookup_table_entry *cur_lte)
+{
+       if (*cur_size >= *next_size) {
+               printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+                      "(uncompressed) written (%u%% done)",
+                      *cur_size >> 20,
+                      total_size >> 20, *cur_percent);
+               fflush(stdout);
+               *next_size += one_percent;
+               (*cur_percent)++;
+       }
+       *cur_size += wim_resource_size(cur_lte);
+}
+
+static void finish_stream_write_progress(u64 total_size)
+{
+       printf("\r%"PRIu64" MiB of %"PRIu64" MiB "
+              "(uncompressed) written (100%% done)\n",
+              total_size >> 20, total_size >> 20);
+       fflush(stdout);
+}
 
 static int write_stream_list_serial(struct list_head *stream_list,
                                    FILE *out_fp, int out_ctype,
-                                   int write_flags)
+                                   int write_flags, u64 total_size)
 {
        struct lookup_table_entry *lte;
        int ret;
 
+       u64 one_percent = total_size / 100;
+       u64 cur_size = 0;
+       u64 next_size = 0;
+       unsigned cur_percent = 0;
+
        list_for_each_entry(lte, stream_list, staging_list) {
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                       show_stream_write_progress(&cur_size, &next_size,
+                                                  total_size, one_percent,
+                                                  &cur_percent, lte);
+               }
                ret = write_wim_resource(lte, out_fp, out_ctype,
                                         &lte->output_resource_entry, 0);
                if (ret != 0)
                        return ret;
        }
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+               finish_stream_write_progress(total_size);
        return 0;
 }
 
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
 static int write_wim_chunks(struct message *msg, FILE *out_fp,
                            struct chunk_table *chunk_tab)
 {
@@ -957,7 +1003,9 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                   int out_ctype,
                                   struct shared_queue *res_to_compress_queue,
                                   struct shared_queue *compressed_res_queue,
-                                  size_t queue_size)
+                                  size_t queue_size,
+                                  int write_flags,
+                                  u64 total_size)
 {
        int ret;
 
@@ -1010,6 +1058,11 @@ static int main_writer_thread_proc(struct list_head *stream_list,
        struct lookup_table_entry *lte;
        struct message *msg;
 
+       u64 one_percent = total_size / 100;
+       u64 cur_size = 0;
+       u64 next_size = 0;
+       unsigned cur_percent = 0;
+
 #ifdef WITH_NTFS_3G
        ntfs_inode *ni = NULL;
 #endif
@@ -1019,6 +1072,8 @@ static int main_writer_thread_proc(struct list_head *stream_list,
 #else
        ret = prepare_resource_for_read(next_lte);
 #endif
+       if (ret != 0)
+               goto out;
 
        DEBUG("Initializing buffers for uncompressed "
              "and compressed data (%zu bytes needed)",
@@ -1041,6 +1096,9 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                }
        }
 
+       // This loop is executed until all resources have been written, except
+       // possibly a few that have been added to the @my_resources list for
+       // writing later.
        while (1) {
                // Send chunks to the compressor threads until either (a) there
                // are no more messages available since they were all sent off,
@@ -1206,6 +1264,15 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                        DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
                        if (msg->begin_chunk == 0) {
                                DEBUG2("Begin chunk tab");
+                               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                                       show_stream_write_progress(&cur_size,
+                                                                  &next_size,
+                                                                  total_size,
+                                                                  one_percent,
+                                                                  &cur_percent,
+                                                                  cur_lte);
+                               }
+
                                // This is the first set of chunks.  Leave space
                                // for the chunk table in the output file.
                                off_t cur_offset = ftello(out_fp);
@@ -1288,6 +1355,15 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                                         &my_resources,
                                                         staging_list)
                                {
+                                       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                                               show_stream_write_progress(&cur_size,
+                                                                          &next_size,
+                                                                          total_size,
+                                                                          one_percent,
+                                                                          &cur_percent,
+                                                                          lte);
+                                       }
+
                                        ret = write_wim_resource(lte,
                                                                 out_fp,
                                                                 out_ctype,
@@ -1309,6 +1385,14 @@ out:
 #endif
        if (ret == 0) {
                list_for_each_entry(lte, &my_resources, staging_list) {
+                       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                               show_stream_write_progress(&cur_size,
+                                                          &next_size,
+                                                          total_size,
+                                                          one_percent,
+                                                          &cur_percent,
+                                                          lte);
+                       }
                        ret = write_wim_resource(lte, out_fp,
                                                 out_ctype,
                                                 &lte->output_resource_entry,
@@ -1316,6 +1400,8 @@ out:
                        if (ret != 0)
                                break;
                }
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+                       finish_stream_write_progress(total_size);
        } else {
                size_t num_available_msgs = 0;
                struct list_head *cur;
@@ -1344,79 +1430,100 @@ out:
        return ret;
 }
 
+
+static const char *get_data_type(int ctype)
+{
+       switch (ctype) {
+       case WIM_COMPRESSION_TYPE_NONE:
+               return "uncompressed";
+       case WIM_COMPRESSION_TYPE_LZX:
+               return "LZX-compressed";
+       case WIM_COMPRESSION_TYPE_XPRESS:
+               return "XPRESS-compressed";
+       }
+}
+
 static int write_stream_list_parallel(struct list_head *stream_list,
                                      FILE *out_fp, int out_ctype,
-                                     int write_flags)
+                                     int write_flags, u64 total_size,
+                                     unsigned num_threads)
 {
        int ret;
-       long nthreads;
        struct shared_queue res_to_compress_queue;
        struct shared_queue compressed_res_queue;
+       pthread_t *compressor_threads = NULL;
 
-       nthreads = sysconf(_SC_NPROCESSORS_ONLN);
-       if (nthreads < 1) {
-               WARNING("Could not determine number of processors! Assuming 1");
-               goto out_serial;
+       if (num_threads == 0) {
+               long nthreads = sysconf(_SC_NPROCESSORS_ONLN);
+               if (nthreads < 1) {
+                       WARNING("Could not determine number of processors! Assuming 1");
+                       goto out_serial;
+               } else {
+                       num_threads = nthreads;
+               }
        }
 
        wimlib_assert(stream_list->next != stream_list);
 
-       {
-               pthread_t compressor_threads[nthreads];
 
-               static const double MESSAGES_PER_THREAD = 2.0;
-               size_t queue_size = (size_t)(nthreads * MESSAGES_PER_THREAD);
+       static const double MESSAGES_PER_THREAD = 2.0;
+       size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD);
 
-               DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
+       DEBUG("Initializing shared queues (queue_size=%zu)", queue_size);
 
-               ret = shared_queue_init(&res_to_compress_queue, queue_size);
-               if (ret != 0)
-                       goto out_serial;
+       ret = shared_queue_init(&res_to_compress_queue, queue_size);
+       if (ret != 0)
+               goto out_serial;
 
-               ret = shared_queue_init(&compressed_res_queue, queue_size);
-               if (ret != 0)
-                       goto out_destroy_res_to_compress_queue;
-
-               struct compressor_thread_params params;
-               params.res_to_compress_queue = &res_to_compress_queue;
-               params.compressed_res_queue = &compressed_res_queue;
-               params.compress = get_compress_func(out_ctype);
-
-               for (long i = 0; i < nthreads; i++) {
-                       DEBUG("pthread_create thread %ld", i);
-                       ret = pthread_create(&compressor_threads[i], NULL,
-                                            compressor_thread_proc, &params);
-                       if (ret != 0) {
-                               ERROR_WITH_ERRNO("Failed to create compressor "
-                                                "thread %ld", i);
-                               nthreads = i;
-                               goto out_join;
-                       }
+       ret = shared_queue_init(&compressed_res_queue, queue_size);
+       if (ret != 0)
+               goto out_destroy_res_to_compress_queue;
+
+       struct compressor_thread_params params;
+       params.res_to_compress_queue = &res_to_compress_queue;
+       params.compressed_res_queue = &compressed_res_queue;
+       params.compress = get_compress_func(out_ctype);
+
+       compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
+
+       for (unsigned i = 0; i < num_threads; i++) {
+               DEBUG("pthread_create thread %u", i);
+               ret = pthread_create(&compressor_threads[i], NULL,
+                                    compressor_thread_proc, &params);
+               if (ret != 0) {
+                       ret = -1;
+                       ERROR_WITH_ERRNO("Failed to create compressor "
+                                        "thread %u", i);
+                       num_threads = i;
+                       goto out_join;
                }
+       }
 
-               if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
-                       printf("Writing compressed data using %ld threads...\n",
-                              nthreads);
-               }
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+               printf("Writing %s compressed data using %u threads...\n",
+                      get_data_type(out_ctype), num_threads);
+       }
 
-               ret = main_writer_thread_proc(stream_list,
-                                             out_fp,
-                                             out_ctype,
-                                             &res_to_compress_queue,
-                                             &compressed_res_queue,
-                                             queue_size);
-
-       out_join:
-               for (long i = 0; i < nthreads; i++)
-                       shared_queue_put(&res_to_compress_queue, NULL);
-
-               for (long i = 0; i < nthreads; i++) {
-                       if (pthread_join(compressor_threads[i], NULL)) {
-                               WARNING("Failed to join compressor thread %ld: %s",
-                                       i, strerror(errno));
-                       }
+       ret = main_writer_thread_proc(stream_list,
+                                     out_fp,
+                                     out_ctype,
+                                     &res_to_compress_queue,
+                                     &compressed_res_queue,
+                                     queue_size,
+                                     write_flags,
+                                     total_size);
+
+out_join:
+       for (unsigned i = 0; i < num_threads; i++)
+               shared_queue_put(&res_to_compress_queue, NULL);
+
+       for (unsigned i = 0; i < num_threads; i++) {
+               if (pthread_join(compressor_threads[i], NULL)) {
+                       WARNING("Failed to join compressor thread %u: %s",
+                               i, strerror(errno));
                }
        }
+       FREE(compressor_threads);
        shared_queue_destroy(&compressed_res_queue);
 out_destroy_res_to_compress_queue:
        shared_queue_destroy(&res_to_compress_queue);
@@ -1425,11 +1532,13 @@ out_destroy_res_to_compress_queue:
 out_serial:
        WARNING("Falling back to single-threaded compression");
        return write_stream_list_serial(stream_list, out_fp,
-                                       out_ctype, write_flags);
+                                       out_ctype, write_flags, total_size);
 }
+#endif
 
 static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
-                            int out_ctype, int write_flags)
+                            int out_ctype, int write_flags,
+                            unsigned num_threads)
 {
        struct lookup_table_entry *lte;
        size_t num_streams = 0;
@@ -1455,16 +1564,26 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
                       wimlib_get_compression_type_string(out_ctype));
        }
 
-       if (compression_needed && total_size >= 1000000) {
+#ifdef ENABLE_MULTITHREADED_COMPRESSION
+       if (compression_needed && total_size >= 1000000 && num_threads != 1) {
                return write_stream_list_parallel(stream_list, out_fp,
-                                                 out_ctype, write_flags);
-       } else {
-               if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) {
-                       puts("Using 1 thread (no compression needed)");
+                                                 out_ctype, write_flags,
+                                                 total_size, num_threads);
+       }
+       else
+#endif
+       {
+               if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) {
+                       const char *reason = "";
+                       if (!compression_needed)
+                               reason = " (no compression needed)";
+                       printf("Writing %s data using 1 thread%s\n",
+                              get_data_type(out_ctype), reason);
                }
 
                return write_stream_list_serial(stream_list, out_fp,
-                                               out_ctype, write_flags);
+                                               out_ctype, write_flags,
+                                               total_size);
        }
 }
 
@@ -1489,7 +1608,8 @@ static int find_streams_to_write(WIMStruct *w)
                                  dentry_find_streams_to_write, w);
 }
 
-static int write_wim_streams(WIMStruct *w, int image, int write_flags)
+static int write_wim_streams(WIMStruct *w, int image, int write_flags,
+                            unsigned num_threads)
 {
 
        LIST_HEAD(stream_list);
@@ -1497,7 +1617,8 @@ static int write_wim_streams(WIMStruct *w, int image, int write_flags)
        w->private = &stream_list;
        for_image(w, image, find_streams_to_write);
        return write_stream_list(&stream_list, w->out_fp,
-                                wimlib_get_compression_type(w), write_flags);
+                                wimlib_get_compression_type(w), write_flags,
+                                num_threads);
 }
 
 /*
@@ -1559,10 +1680,10 @@ int finish_write(WIMStruct *w, int image, int write_flags)
                return WIMLIB_ERR_WRITE;
        xml_data_size = integrity_offset - xml_data_offset;
 
-       hdr.xml_res_entry.offset                 = xml_data_offset;
-       hdr.xml_res_entry.size                   = xml_data_size;
-       hdr.xml_res_entry.original_size          = xml_data_size;
-       hdr.xml_res_entry.flags                  = 0;
+       hdr.xml_res_entry.offset        = xml_data_offset;
+       hdr.xml_res_entry.size          = xml_data_size;
+       hdr.xml_res_entry.original_size = xml_data_size;
+       hdr.xml_res_entry.flags         = WIM_RESHDR_FLAG_METADATA;
 
        if (write_flags & WIMLIB_WRITE_FLAG_CHECK_INTEGRITY) {
                ret = write_integrity_table(out, WIM_HEADER_DISK_SIZE,
@@ -1621,9 +1742,19 @@ int finish_write(WIMStruct *w, int image, int write_flags)
        if (ret != 0)
                return ret;
 
-       DEBUG("Closing output file.");
-       wimlib_assert(w->out_fp != NULL);
-       if (fclose(w->out_fp) != 0) {
+       if (write_flags & WIMLIB_WRITE_FLAG_FSYNC) {
+               DEBUG("fsync output WIM file");
+               if (fflush(out) != 0
+                   || fsync(fileno(out)) != 0)
+               {
+                       ERROR_WITH_ERRNO("Error flushing data to WIM file");
+                       ret = WIMLIB_ERR_WRITE;
+               }
+       }
+
+       DEBUG("Closing output WIM file.");
+
+       if (fclose(out) != 0) {
                ERROR_WITH_ERRNO("Failed to close the WIM file");
                ret = WIMLIB_ERR_WRITE;
        }
@@ -1662,7 +1793,7 @@ int begin_write(WIMStruct *w, const char *path, int write_flags)
 
 /* Writes a stand-alone WIM to a file.  */
 WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
-                          int image, int write_flags)
+                          int image, int write_flags, unsigned num_threads)
 {
        int ret;
 
@@ -1676,6 +1807,7 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
                return WIMLIB_ERR_INVALID_IMAGE;
 
 
+
        if (w->hdr.total_parts != 1) {
                ERROR("Cannot call wimlib_write() on part of a split WIM");
                return WIMLIB_ERR_SPLIT_UNSUPPORTED;
@@ -1692,13 +1824,16 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
 
        for_lookup_table_entry(w->lookup_table, lte_zero_out_refcnt, NULL);
 
-       ret = write_wim_streams(w, image, write_flags);
+       ret = write_wim_streams(w, image, write_flags, num_threads);
 
        if (ret != 0) {
                /*ERROR("Failed to write WIM file resources to `%s'", path);*/
                return ret;
        }
 
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
+               printf("Writing image metadata...\n");
+
        ret = for_image(w, image, write_metadata_resource);
 
        if (ret != 0) {
@@ -1710,7 +1845,7 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path,
        if (ret != 0)
                return ret;
 
-       if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE)
+       if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS)
                printf("Successfully wrote `%s'\n", path);
        return 0;
 }