X-Git-Url: https://wimlib.net/git/?a=blobdiff_plain;f=src%2Fwrite.c;h=668c7abd03c0c21353aa2ec65cd4c6adb1a4e40d;hb=c7757ea1100cc329ea78ae16abad9f5338151b2b;hp=a84c16b0c91e560ded0b13f6ea95b812899e9e70;hpb=213d64619675a59db71fed619bf47c1870b0e2e2;p=wimlib diff --git a/src/write.c b/src/write.c index a84c16b0..668c7abd 100644 --- a/src/write.c +++ b/src/write.c @@ -33,9 +33,12 @@ #include "lzx.h" #include "xpress.h" #include + +#ifdef ENABLE_MULTITHREADED_COMPRESSION #include #include #include +#endif #ifdef WITH_NTFS_3G #include @@ -47,9 +50,10 @@ #ifdef HAVE_ALLOCA_H #include +#else +#include #endif - /* Reopens the FILE* for a WIM read-write. */ static int reopen_rw(WIMStruct *w) { @@ -130,7 +134,7 @@ WIMLIBAPI int wimlib_overwrite(WIMStruct *w, int write_flags, return WIMLIB_ERR_RENAME; } - if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) + if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) printf("Successfully renamed `%s' to `%s'\n", tmpfile, wimfile_name); return 0; @@ -784,6 +788,7 @@ out: } +#ifdef ENABLE_MULTITHREADED_COMPRESSION struct shared_queue { sem_t filled_slots; sem_t empty_slots; @@ -903,6 +908,7 @@ static void *compressor_thread_proc(void *arg) } DEBUG("Compressor thread terminating"); } +#endif static void show_stream_write_progress(u64 *cur_size, u64 *next_size, u64 total_size, u64 one_percent, @@ -957,6 +963,7 @@ static int write_stream_list_serial(struct list_head *stream_list, return 0; } +#ifdef ENABLE_MULTITHREADED_COMPRESSION static int write_wim_chunks(struct message *msg, FILE *out_fp, struct chunk_table *chunk_tab) { @@ -979,7 +986,6 @@ static int write_wim_chunks(struct message *msg, FILE *out_fp, return 0; } - /* * This function is executed by the main thread when the resources are being * compressed in parallel. The main thread is in change of all reading of the @@ -1066,6 +1072,8 @@ static int main_writer_thread_proc(struct list_head *stream_list, #else ret = prepare_resource_for_read(next_lte); #endif + if (ret != 0) + goto out; DEBUG("Initializing buffers for uncompressed " "and compressed data (%zu bytes needed)", @@ -1088,6 +1096,9 @@ static int main_writer_thread_proc(struct list_head *stream_list, } } + // This loop is executed until all resources have been written, except + // possibly a few that have been added to the @my_resources list for + // writing later. while (1) { // Send chunks to the compressor threads until either (a) there // are no more messages available since they were all sent off, @@ -1253,9 +1264,6 @@ static int main_writer_thread_proc(struct list_head *stream_list, DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk); if (msg->begin_chunk == 0) { DEBUG2("Begin chunk tab"); - - - if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) { show_stream_write_progress(&cur_size, &next_size, @@ -1377,12 +1385,6 @@ out: #endif if (ret == 0) { list_for_each_entry(lte, &my_resources, staging_list) { - ret = write_wim_resource(lte, out_fp, - out_ctype, - <e->output_resource_entry, - 0); - if (ret != 0) - break; if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) { show_stream_write_progress(&cur_size, &next_size, @@ -1391,6 +1393,12 @@ out: &cur_percent, lte); } + ret = write_wim_resource(lte, out_fp, + out_ctype, + <e->output_resource_entry, + 0); + if (ret != 0) + break; } if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) finish_stream_write_progress(total_size); @@ -1422,6 +1430,19 @@ out: return ret; } + +static const char *get_data_type(int ctype) +{ + switch (ctype) { + case WIM_COMPRESSION_TYPE_NONE: + return "uncompressed"; + case WIM_COMPRESSION_TYPE_LZX: + return "LZX-compressed"; + case WIM_COMPRESSION_TYPE_XPRESS: + return "XPRESS-compressed"; + } +} + static int write_stream_list_parallel(struct list_head *stream_list, FILE *out_fp, int out_ctype, int write_flags, u64 total_size, @@ -1430,6 +1451,7 @@ static int write_stream_list_parallel(struct list_head *stream_list, int ret; struct shared_queue res_to_compress_queue; struct shared_queue compressed_res_queue; + pthread_t *compressor_threads = NULL; if (num_threads == 0) { long nthreads = sysconf(_SC_NPROCESSORS_ONLN); @@ -1443,64 +1465,65 @@ static int write_stream_list_parallel(struct list_head *stream_list, wimlib_assert(stream_list->next != stream_list); - { - pthread_t compressor_threads[num_threads]; - static const double MESSAGES_PER_THREAD = 2.0; - size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD); + static const double MESSAGES_PER_THREAD = 2.0; + size_t queue_size = (size_t)(num_threads * MESSAGES_PER_THREAD); - DEBUG("Initializing shared queues (queue_size=%zu)", queue_size); + DEBUG("Initializing shared queues (queue_size=%zu)", queue_size); - ret = shared_queue_init(&res_to_compress_queue, queue_size); - if (ret != 0) - goto out_serial; + ret = shared_queue_init(&res_to_compress_queue, queue_size); + if (ret != 0) + goto out_serial; - ret = shared_queue_init(&compressed_res_queue, queue_size); - if (ret != 0) - goto out_destroy_res_to_compress_queue; - - struct compressor_thread_params params; - params.res_to_compress_queue = &res_to_compress_queue; - params.compressed_res_queue = &compressed_res_queue; - params.compress = get_compress_func(out_ctype); - - for (unsigned i = 0; i < num_threads; i++) { - DEBUG("pthread_create thread %u", i); - ret = pthread_create(&compressor_threads[i], NULL, - compressor_thread_proc, ¶ms); - if (ret != 0) { - ERROR_WITH_ERRNO("Failed to create compressor " - "thread %u", i); - num_threads = i; - goto out_join; - } + ret = shared_queue_init(&compressed_res_queue, queue_size); + if (ret != 0) + goto out_destroy_res_to_compress_queue; + + struct compressor_thread_params params; + params.res_to_compress_queue = &res_to_compress_queue; + params.compressed_res_queue = &compressed_res_queue; + params.compress = get_compress_func(out_ctype); + + compressor_threads = MALLOC(num_threads * sizeof(pthread_t)); + + for (unsigned i = 0; i < num_threads; i++) { + DEBUG("pthread_create thread %u", i); + ret = pthread_create(&compressor_threads[i], NULL, + compressor_thread_proc, ¶ms); + if (ret != 0) { + ret = -1; + ERROR_WITH_ERRNO("Failed to create compressor " + "thread %u", i); + num_threads = i; + goto out_join; } + } - if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) { - printf("Writing compressed data using %u threads...\n", - num_threads); - } + if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) { + printf("Writing %s compressed data using %u threads...\n", + get_data_type(out_ctype), num_threads); + } - ret = main_writer_thread_proc(stream_list, - out_fp, - out_ctype, - &res_to_compress_queue, - &compressed_res_queue, - queue_size, - write_flags, - total_size); - - out_join: - for (unsigned i = 0; i < num_threads; i++) - shared_queue_put(&res_to_compress_queue, NULL); - - for (unsigned i = 0; i < num_threads; i++) { - if (pthread_join(compressor_threads[i], NULL)) { - WARNING("Failed to join compressor thread %u: %s", - i, strerror(errno)); - } + ret = main_writer_thread_proc(stream_list, + out_fp, + out_ctype, + &res_to_compress_queue, + &compressed_res_queue, + queue_size, + write_flags, + total_size); + +out_join: + for (unsigned i = 0; i < num_threads; i++) + shared_queue_put(&res_to_compress_queue, NULL); + + for (unsigned i = 0; i < num_threads; i++) { + if (pthread_join(compressor_threads[i], NULL)) { + WARNING("Failed to join compressor thread %u: %s", + i, strerror(errno)); } } + FREE(compressor_threads); shared_queue_destroy(&compressed_res_queue); out_destroy_res_to_compress_queue: shared_queue_destroy(&res_to_compress_queue); @@ -1511,6 +1534,7 @@ out_serial: return write_stream_list_serial(stream_list, out_fp, out_ctype, write_flags, total_size); } +#endif static int write_stream_list(struct list_head *stream_list, FILE *out_fp, int out_ctype, int write_flags, @@ -1540,16 +1564,21 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp, wimlib_get_compression_type_string(out_ctype)); } +#ifdef ENABLE_MULTITHREADED_COMPRESSION if (compression_needed && total_size >= 1000000 && num_threads != 1) { return write_stream_list_parallel(stream_list, out_fp, out_ctype, write_flags, total_size, num_threads); - } else { + } + else +#endif + { if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) { const char *reason = ""; - if (num_threads != 1) + if (!compression_needed) reason = " (no compression needed)"; - printf("Writing data using 1 thread%s\n", reason); + printf("Writing %s data using 1 thread%s\n", + get_data_type(out_ctype), reason); } return write_stream_list_serial(stream_list, out_fp, @@ -1791,6 +1820,9 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path, return ret; } + if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) + printf("Writing image metadata...\n"); + ret = for_image(w, image, write_metadata_resource); if (ret != 0) { @@ -1802,7 +1834,7 @@ WIMLIBAPI int wimlib_write(WIMStruct *w, const char *path, if (ret != 0) return ret; - if (write_flags & WIMLIB_WRITE_FLAG_VERBOSE) + if (write_flags & WIMLIB_WRITE_FLAG_SHOW_PROGRESS) printf("Successfully wrote `%s'\n", path); return 0; }