X-Git-Url: https://wimlib.net/git/?a=blobdiff_plain;f=src%2Fwrite.c;h=e4a0f3ad45712c6ce8ab5a5da345b9aeed0c0bf8;hb=962cca1a6f9e45827006a745be086b7af3b725aa;hp=3cac5a2868a7d8ef7c9f3ea90e853e3de824b9fe;hpb=3f10159fec6c397ed4d182a25a1aa5b3147101cf;p=wimlib diff --git a/src/write.c b/src/write.c index 3cac5a28..e4a0f3ad 100644 --- a/src/write.c +++ b/src/write.c @@ -53,17 +53,16 @@ # include #endif -#include -#include #include +#include +#include +#include +#include #ifdef HAVE_ALLOCA_H # include -#else -# include #endif -#include #ifndef __WIN32__ # include /* for `struct iovec' */ @@ -683,7 +682,6 @@ struct message { u8 *compressed_chunks[MAX_CHUNKS_PER_MSG]; unsigned uncompressed_chunk_sizes[MAX_CHUNKS_PER_MSG]; struct iovec out_chunks[MAX_CHUNKS_PER_MSG]; - size_t total_out_bytes; unsigned num_chunks; struct list_head list; bool complete; @@ -693,7 +691,6 @@ struct message { static void compress_chunks(struct message *msg, compress_func_t compress) { - msg->total_out_bytes = 0; for (unsigned i = 0; i < msg->num_chunks; i++) { unsigned len = compress(msg->uncompressed_chunks[i], msg->uncompressed_chunk_sizes[i], @@ -711,7 +708,6 @@ compress_chunks(struct message *msg, compress_func_t compress) } msg->out_chunks[i].iov_base = out_chunk; msg->out_chunks[i].iov_len = out_len; - msg->total_out_bytes += out_len; } } @@ -739,32 +735,52 @@ compressor_thread_proc(void *arg) } #endif /* ENABLE_MULTITHREADED_COMPRESSION */ +struct write_streams_progress_data { + wimlib_progress_func_t progress_func; + union wimlib_progress_info progress; + uint64_t next_progress; + WIMStruct *prev_wim_part; +}; + static void -do_write_streams_progress(union wimlib_progress_info *progress, - wimlib_progress_func_t progress_func, - uint64_t size_added, +do_write_streams_progress(struct write_streams_progress_data *progress_data, + struct wim_lookup_table_entry *lte, bool stream_discarded) { + union wimlib_progress_info *progress = &progress_data->progress; + bool new_wim_part; + if (stream_discarded) { - progress->write_streams.total_bytes -= size_added; - if (progress->write_streams._private != ~(uint64_t)0 && - progress->write_streams._private > progress->write_streams.total_bytes) + progress->write_streams.total_bytes -= wim_resource_size(lte); + if (progress_data->next_progress != ~(uint64_t)0 && + progress_data->next_progress > progress->write_streams.total_bytes) { - progress->write_streams._private = progress->write_streams.total_bytes; + progress_data->next_progress = progress->write_streams.total_bytes; } } else { - progress->write_streams.completed_bytes += size_added; + progress->write_streams.completed_bytes += wim_resource_size(lte); + } + new_wim_part = false; + if (lte->resource_location == RESOURCE_IN_WIM && + lte->wim != progress_data->prev_wim_part) + { + if (progress_data->prev_wim_part) { + new_wim_part = true; + progress->write_streams.completed_parts++; + } + progress_data->prev_wim_part = lte->wim; } progress->write_streams.completed_streams++; - if (progress_func && - progress->write_streams.completed_bytes >= progress->write_streams._private) + if (progress_data->progress_func + && (progress->write_streams.completed_bytes >= progress_data->next_progress + || new_wim_part)) { - progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, - progress); - if (progress->write_streams._private == progress->write_streams.total_bytes) { - progress->write_streams._private = ~(uint64_t)0; + progress_data->progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, + progress); + if (progress_data->next_progress == progress->write_streams.total_bytes) { + progress_data->next_progress = ~(uint64_t)0; } else { - progress->write_streams._private = + progress_data->next_progress = min(progress->write_streams.total_bytes, progress->write_streams.completed_bytes + progress->write_streams.total_bytes / 100); @@ -787,6 +803,7 @@ serial_write_stream(struct wim_lookup_table_entry *lte, void *_ctx) ctx->write_resource_flags); } + /* Write a list of streams, taking into account that some streams may be * duplicates that are checksummed and discarded on the fly, and also delegating * the actual writing of a stream to a function @write_stream_cb, which is @@ -796,8 +813,7 @@ do_write_stream_list(struct list_head *stream_list, struct wim_lookup_table *lookup_table, int (*write_stream_cb)(struct wim_lookup_table_entry *, void *), void *write_stream_ctx, - wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress) + struct write_streams_progress_data *progress_data) { int ret = 0; struct wim_lookup_table_entry *lte; @@ -860,10 +876,8 @@ do_write_stream_list(struct list_head *stream_list, } skip_to_progress: if (!lte->no_progress) { - do_write_streams_progress(progress, - progress_func, - wim_resource_size(lte), - stream_discarded); + do_write_streams_progress(progress_data, + lte, stream_discarded); } } return ret; @@ -875,8 +889,7 @@ do_write_stream_list_serial(struct list_head *stream_list, struct filedes *out_fd, int out_ctype, int write_resource_flags, - wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress) + struct write_streams_progress_data *progress_data) { struct serial_write_stream_ctx ctx = { .out_fd = out_fd, @@ -887,8 +900,7 @@ do_write_stream_list_serial(struct list_head *stream_list, lookup_table, serial_write_stream, &ctx, - progress_func, - progress); + progress_data); } static inline int @@ -909,21 +921,22 @@ write_stream_list_serial(struct list_head *stream_list, struct filedes *out_fd, int out_ctype, int write_resource_flags, - wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress) + struct write_streams_progress_data *progress_data) { + union wimlib_progress_info *progress = &progress_data->progress; DEBUG("Writing stream list of size %"PRIu64" (serial version)", progress->write_streams.total_streams); progress->write_streams.num_threads = 1; - if (progress_func) - progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress); + if (progress_data->progress_func) { + progress_data->progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, + progress); + } return do_write_stream_list_serial(stream_list, lookup_table, out_fd, out_ctype, write_resource_flags, - progress_func, - progress); + progress_data); } #ifdef ENABLE_MULTITHREADED_COMPRESSION @@ -935,7 +948,6 @@ write_wim_chunks(struct message *msg, struct filedes *out_fd, struct iovec *vecs; struct pwm_chunk_hdr *chunk_hdrs; unsigned nvecs; - size_t nbytes; int ret; for (unsigned i = 0; i < msg->num_chunks; i++) @@ -944,7 +956,6 @@ write_wim_chunks(struct message *msg, struct filedes *out_fd, if (!(write_resource_flags & WIMLIB_WRITE_RESOURCE_FLAG_PIPABLE)) { nvecs = msg->num_chunks; vecs = msg->out_chunks; - nbytes = msg->total_out_bytes; } else { /* Special case: If writing a compressed resource to a pipable * WIM, prefix each compressed chunk with a header that gives @@ -960,7 +971,6 @@ write_wim_chunks(struct message *msg, struct filedes *out_fd, vecs[i * 2 + 1].iov_base = msg->out_chunks[i].iov_base; vecs[i * 2 + 1].iov_len = msg->out_chunks[i].iov_len; } - nbytes = msg->total_out_bytes + msg->num_chunks * sizeof(chunk_hdrs[0]); } ret = full_writev(out_fd, vecs, nvecs); if (ret) @@ -978,8 +988,7 @@ struct main_writer_thread_ctx { struct shared_queue *res_to_compress_queue; struct shared_queue *compressed_res_queue; size_t num_messages; - wimlib_progress_func_t progress_func; - union wimlib_progress_info *progress; + struct write_streams_progress_data *progress_data; struct list_head available_msgs; struct list_head outstanding_streams; @@ -1221,10 +1230,8 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) cur_lte->output_resource_entry.flags); } - do_write_streams_progress(ctx->progress, - ctx->progress_func, - wim_resource_size(cur_lte), - false); + do_write_streams_progress(ctx->progress_data, + cur_lte, false); /* Since we just finished writing a stream, write any * streams that have been added to the serial_streams @@ -1238,8 +1245,7 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) ctx->out_fd, ctx->out_ctype, ctx->write_resource_flags, - ctx->progress_func, - ctx->progress); + ctx->progress_data); if (ret) return ret; } @@ -1337,8 +1343,7 @@ main_writer_thread_finish(void *_ctx) ctx->out_fd, ctx->out_ctype, ctx->write_resource_flags, - ctx->progress_func, - ctx->progress); + ctx->progress_data); } static int @@ -1433,14 +1438,14 @@ write_stream_list_parallel(struct list_head *stream_list, struct filedes *out_fd, int out_ctype, int write_resource_flags, - wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress, + struct write_streams_progress_data *progress_data, unsigned num_threads) { int ret; struct shared_queue res_to_compress_queue; struct shared_queue compressed_res_queue; pthread_t *compressor_threads = NULL; + union wimlib_progress_info *progress = &progress_data->progress; if (num_threads == 0) { long nthreads = get_default_num_threads(); @@ -1498,8 +1503,10 @@ write_stream_list_parallel(struct list_head *stream_list, } } - if (progress_func) - progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress); + if (progress_data->progress_func) { + progress_data->progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, + progress); + } struct main_writer_thread_ctx ctx; ctx.stream_list = stream_list; @@ -1510,14 +1517,13 @@ write_stream_list_parallel(struct list_head *stream_list, ctx.compressed_res_queue = &compressed_res_queue; ctx.num_messages = queue_size; ctx.write_resource_flags = write_resource_flags; - ctx.progress_func = progress_func; - ctx.progress = progress; + ctx.progress_data = progress_data; ret = main_writer_thread_init_ctx(&ctx); if (ret) goto out_join; ret = do_write_stream_list(stream_list, lookup_table, main_thread_process_next_stream, - &ctx, progress_func, progress); + &ctx, progress_data); if (ret) goto out_destroy_ctx; @@ -1556,8 +1562,7 @@ out_serial_quiet: out_fd, out_ctype, write_resource_flags, - progress_func, - progress); + progress_data); } #endif @@ -1576,9 +1581,11 @@ write_stream_list(struct list_head *stream_list, size_t num_streams = 0; u64 total_bytes = 0; u64 total_compression_bytes = 0; - union wimlib_progress_info progress; + struct write_streams_progress_data progress_data; int ret; int write_resource_flags; + unsigned total_parts = 0; + WIMStruct *prev_wim_part = NULL; if (list_empty(stream_list)) return 0; @@ -1587,6 +1594,10 @@ write_stream_list(struct list_head *stream_list, DEBUG("write_resource_flags=0x%08x", write_resource_flags); + sort_stream_list_by_sequential_order(stream_list, + offsetof(struct wim_lookup_table_entry, + write_streams_list)); + /* Calculate the total size of the streams to be written. Note: this * will be the uncompressed size, as we may not know the compressed size * yet, and also this will assume that every unhashed stream will be @@ -1600,14 +1611,29 @@ write_stream_list(struct list_head *stream_list, { total_compression_bytes += wim_resource_size(lte); } + if (lte->resource_location == RESOURCE_IN_WIM) { + if (prev_wim_part != lte->wim) { + prev_wim_part = lte->wim; + total_parts++; + } + } + } - progress.write_streams.total_bytes = total_bytes; - progress.write_streams.total_streams = num_streams; - progress.write_streams.completed_bytes = 0; - progress.write_streams.completed_streams = 0; - progress.write_streams.num_threads = num_threads; - progress.write_streams.compression_type = out_ctype; - progress.write_streams._private = 0; + + memset(&progress_data, 0, sizeof(progress_data)); + progress_data.progress_func = progress_func; + + progress_data.progress.write_streams.total_bytes = total_bytes; + progress_data.progress.write_streams.total_streams = num_streams; + progress_data.progress.write_streams.completed_bytes = 0; + progress_data.progress.write_streams.completed_streams = 0; + progress_data.progress.write_streams.num_threads = num_threads; + progress_data.progress.write_streams.compression_type = out_ctype; + progress_data.progress.write_streams.total_parts = total_parts; + progress_data.progress.write_streams.completed_parts = 0; + + progress_data.next_progress = 0; + progress_data.prev_wim_part = NULL; #ifdef ENABLE_MULTITHREADED_COMPRESSION if (total_compression_bytes >= 2000000 && num_threads != 1) @@ -1616,8 +1642,7 @@ write_stream_list(struct list_head *stream_list, out_fd, out_ctype, write_resource_flags, - progress_func, - &progress, + &progress_data, num_threads); else #endif @@ -1626,8 +1651,7 @@ write_stream_list(struct list_head *stream_list, out_fd, out_ctype, write_resource_flags, - progress_func, - &progress); + &progress_data); if (ret == 0) DEBUG("Successfully wrote stream list."); else