X-Git-Url: https://wimlib.net/git/?p=wimlib;a=blobdiff_plain;f=src%2Fwrite.c;h=0beba03f4264f20fa8286a35fc36cc70b36077c9;hp=bf4f2396e0c28b74d387eaa672eeabad8b0a9930;hb=2af95e7dc313e9abaf571034c3e86f32503bf232;hpb=c35e1762521c7f85b1d7c7d77af9aec07817af9f diff --git a/src/write.c b/src/write.c index bf4f2396..0beba03f 100644 --- a/src/write.c +++ b/src/write.c @@ -97,13 +97,11 @@ begin_wim_resource_chunk_tab(const struct wim_lookup_table_entry *lte, u64 num_chunks = (size + WIM_CHUNK_SIZE - 1) / WIM_CHUNK_SIZE; size_t alloc_size = sizeof(struct chunk_table) + num_chunks * sizeof(u64); struct chunk_table *chunk_tab = CALLOC(1, alloc_size); - int ret; if (!chunk_tab) { ERROR("Failed to allocate chunk table for %"PRIu64" byte " "resource", size); - ret = WIMLIB_ERR_NOMEM; - goto out; + return WIMLIB_ERR_NOMEM; } chunk_tab->file_offset = file_offset; chunk_tab->num_chunks = num_chunks; @@ -119,14 +117,10 @@ begin_wim_resource_chunk_tab(const struct wim_lookup_table_entry *lte, ERROR_WITH_ERRNO("Failed to write chunk table in compressed " "file resource"); FREE(chunk_tab); - ret = WIMLIB_ERR_WRITE; - goto out; + return WIMLIB_ERR_WRITE; } - - ret = 0; *chunk_tab_ret = chunk_tab; -out: - return ret; + return 0; } /* @@ -155,7 +149,7 @@ out: typedef unsigned (*compress_func_t)(const void *chunk, unsigned chunk_size, void *out); -compress_func_t +static compress_func_t get_compress_func(int out_ctype) { if (out_ctype == WIMLIB_COMPRESSION_TYPE_LZX) @@ -650,7 +644,7 @@ do_write_stream_list(struct list_head *stream_list, wimlib_progress_func_t progress_func, union wimlib_progress_info *progress) { - int ret; + int ret = 0; struct wim_lookup_table_entry *lte; /* For each stream in @stream_list ... */ @@ -666,11 +660,9 @@ do_write_stream_list(struct list_head *stream_list, struct wim_lookup_table_entry *tmp; u32 orig_refcnt = lte->out_refcnt; - ret = hash_unhashed_stream(lte, - lookup_table, - &tmp); + ret = hash_unhashed_stream(lte, lookup_table, &tmp); if (ret) - goto out; + break; if (tmp != lte) { lte = tmp; /* We found a duplicate stream. */ @@ -693,9 +685,14 @@ do_write_stream_list(struct list_head *stream_list, * while in the latter case this is done because we do not have * the SHA1 message digest yet. */ wimlib_assert(lte->out_refcnt != 0); + lte->deferred = 0; ret = (*write_stream_cb)(lte, write_stream_ctx); if (ret) - goto out; + break; + /* In parallel mode, some streams are deferred for later, + * serialized processing; ignore them here. */ + if (lte->deferred) + continue; if (lte->unhashed) { list_del(<e->unhashed_list); lookup_table_insert(lookup_table, lte); @@ -708,8 +705,6 @@ do_write_stream_list(struct list_head *stream_list, wim_resource_size(lte)); } } - ret = 0; -out: return ret; } @@ -802,6 +797,7 @@ struct main_writer_thread_ctx { struct list_head available_msgs; struct list_head outstanding_streams; struct list_head serial_streams; + size_t num_outstanding_messages; SHA_CTX next_sha_ctx; u64 next_chunk; @@ -865,23 +861,12 @@ allocate_messages(size_t num_messages) static void main_writer_thread_destroy_ctx(struct main_writer_thread_ctx *ctx) { - size_t num_available_msgs; - size_t num_outstanding_msgs; - struct list_head *cur; - - num_available_msgs = 0; - list_for_each(cur, &ctx->available_msgs) - num_available_msgs++; - - num_outstanding_msgs = ctx->num_messages - num_available_msgs; - while (num_outstanding_msgs--) + while (ctx->num_outstanding_messages--) shared_queue_get(ctx->compressed_res_queue); - free_messages(ctx->msgs, ctx->num_messages); FREE(ctx->cur_chunk_tab); } - static int main_writer_thread_init_ctx(struct main_writer_thread_ctx *ctx) { @@ -906,11 +891,16 @@ main_writer_thread_init_ctx(struct main_writer_thread_ctx *ctx) * currently being read and having chunks fed to the compressor threads. * */ INIT_LIST_HEAD(&ctx->outstanding_streams); + ctx->num_outstanding_messages = 0; + + ctx->next_msg = NULL; /* Resources that don't need any chunks compressed are added to this * list and written directly by the main thread. */ INIT_LIST_HEAD(&ctx->serial_streams); + ctx->cur_chunk_tab = NULL; + return 0; } @@ -922,6 +912,8 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) int ret; wimlib_assert(!list_empty(&ctx->outstanding_streams)); + wimlib_assert(ctx->num_outstanding_messages != 0); + DEBUG2("Receiving more compressed chunks"); cur_lte = container_of(ctx->outstanding_streams.next, struct wim_lookup_table_entry, @@ -932,6 +924,7 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) * compressed. */ msg = shared_queue_get(ctx->compressed_res_queue); msg->complete = true; + --ctx->num_outstanding_messages; DEBUG2("recved msg %p", msg); @@ -941,7 +934,8 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) * message around until all earlier chunks are received. * * Otherwise, write all the chunks we can. */ - while (!list_empty(&cur_lte->msg_list) + while (cur_lte != NULL && + !list_empty(&cur_lte->msg_list) && (msg = container_of(cur_lte->msg_list.next, struct message, list))->complete) @@ -1019,7 +1013,6 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) * resources that don't need to be compressed because * the desired compression type is the same as the * previous compression type). */ -#if 0 ret = do_write_stream_list_serial(&ctx->serial_streams, ctx->lookup_table, ctx->out_fp, @@ -1027,18 +1020,19 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) ctx->write_resource_flags, ctx->progress_func, ctx->progress); -#endif if (ret) return ret; - if (list_empty(&ctx->outstanding_streams)) - return 0; - cur_lte = container_of(ctx->outstanding_streams.next, - struct wim_lookup_table_entry, - being_compressed_list); - #ifdef ENABLE_MORE_DEBUG - DEBUG2("Advance to stream:"); - print_lookup_table_entry(cur_lte, stderr); - #endif + if (list_empty(&ctx->outstanding_streams)) { + cur_lte = NULL; + } else { + cur_lte = container_of(ctx->outstanding_streams.next, + struct wim_lookup_table_entry, + being_compressed_list); + #ifdef ENABLE_MORE_DEBUG + DEBUG2("Advance to stream:"); + print_lookup_table_entry(cur_lte, stderr); + #endif + } } } return 0; @@ -1050,6 +1044,7 @@ main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx) struct main_writer_thread_ctx *ctx = _ctx; int ret; struct message *next_msg; + u64 next_chunk_in_msg; DEBUG2("chunk_size=%zu, wim_resource_size(next_lte)=%"PRIu64, chunk_size, wim_resource_size(ctx->next_lte)); @@ -1082,7 +1077,7 @@ main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx) ctx->next_msg = next_msg; } - u64 next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk; + next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk; /* Fill in the next chunk to compress */ next_msg->uncompressed_chunk_sizes[next_chunk_in_msg] = chunk_size; @@ -1094,6 +1089,7 @@ main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx) /* Send off an array of chunks to compress */ list_add_tail(&next_msg->list, &ctx->next_lte->msg_list); shared_queue_put(ctx->res_to_compress_queue, next_msg); + ++ctx->num_outstanding_messages; ctx->next_msg = NULL; } return 0; @@ -1103,13 +1099,14 @@ static int main_writer_thread_finish(void *_ctx) { struct main_writer_thread_ctx *ctx = _ctx; - int ret = 0; + int ret; DEBUG2("finishing"); - while (!list_empty(&ctx->outstanding_streams)) { + while (ctx->num_outstanding_messages != 0) { ret = receive_compressed_chunks(ctx); if (ret) - break; + return ret; } + wimlib_assert(list_empty(&ctx->outstanding_streams)); return do_write_stream_list_serial(&ctx->serial_streams, ctx->lookup_table, ctx->out_fp, @@ -1154,10 +1151,11 @@ main_thread_process_next_stream(struct wim_lookup_table_entry *lte, void *_ctx) if (wim_resource_size(lte) < 1000 || ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE || (lte->resource_location == RESOURCE_IN_WIM && + !(ctx->write_resource_flags & WIMLIB_RESOURCE_FLAG_RECOMPRESS) && wimlib_get_compression_type(lte->wim) == ctx->out_ctype)) { - list_add_tail(<e->write_streams_list, - &ctx->serial_streams); + list_add_tail(<e->write_streams_list, &ctx->serial_streams); + lte->deferred = 1; ret = 0; } else { ret = submit_stream_for_compression(lte, ctx); @@ -1181,9 +1179,9 @@ write_stream_list_parallel(struct list_head *stream_list, FILE *out_fp, int out_ctype, int write_resource_flags, - unsigned num_threads, wimlib_progress_func_t progress_func, - union wimlib_progress_info *progress) + union wimlib_progress_info *progress, + unsigned num_threads) { int ret; struct shared_queue res_to_compress_queue; @@ -1244,7 +1242,6 @@ write_stream_list_parallel(struct list_head *stream_list, progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress); struct main_writer_thread_ctx ctx; - memset(&ctx, 0, sizeof(ctx)); ctx.stream_list = stream_list; ctx.lookup_table = lookup_table; ctx.out_fp = out_fp; @@ -1252,18 +1249,15 @@ write_stream_list_parallel(struct list_head *stream_list, ctx.res_to_compress_queue = &res_to_compress_queue; ctx.compressed_res_queue = &compressed_res_queue; ctx.num_messages = queue_size; - ctx.write_resource_flags = write_resource_flags; + ctx.write_resource_flags = write_resource_flags | WIMLIB_RESOURCE_FLAG_THREADSAFE_READ; ctx.progress_func = progress_func; ctx.progress = progress; ret = main_writer_thread_init_ctx(&ctx); if (ret) - goto out; - ret = do_write_stream_list(stream_list, - lookup_table, + goto out_join; + ret = do_write_stream_list(stream_list, lookup_table, main_thread_process_next_stream, - &ctx, - NULL, - NULL); + &ctx, NULL, NULL); if (ret) goto out_destroy_ctx; ret = main_writer_thread_finish(&ctx); @@ -1285,7 +1279,6 @@ out_destroy_compressed_res_queue: shared_queue_destroy(&compressed_res_queue); out_destroy_res_to_compress_queue: shared_queue_destroy(&res_to_compress_queue); -out: if (ret >= 0 && ret != WIMLIB_ERR_NOMEM) return ret; out_serial: @@ -1308,7 +1301,7 @@ out_serial: static int write_stream_list(struct list_head *stream_list, struct wim_lookup_table *lookup_table, - FILE *out_fp, int out_ctype, int write_resource_flags, + FILE *out_fp, int out_ctype, int write_flags, unsigned num_threads, wimlib_progress_func_t progress_func) { struct wim_lookup_table_entry *lte; @@ -1317,10 +1310,13 @@ write_stream_list(struct list_head *stream_list, u64 total_compression_bytes = 0; union wimlib_progress_info progress; int ret; + int write_resource_flags; if (list_empty(stream_list)) return 0; + write_resource_flags = write_flags_to_resource_flags(write_flags); + /* Calculate the total size of the streams to be written. Note: this * will be the uncompressed size, as we may not know the compressed size * yet, and also this will assume that every unhashed stream will be @@ -1350,9 +1346,9 @@ write_stream_list(struct list_head *stream_list, out_fp, out_ctype, write_resource_flags, - num_threads, progress_func, - &progress); + &progress, + num_threads); else #endif ret = write_stream_list_serial(stream_list, @@ -1619,7 +1615,7 @@ write_wim_streams(WIMStruct *wim, int image, int write_flags, wim->lookup_table, wim->out_fp, wimlib_get_compression_type(wim), - write_flags_to_resource_flags(write_flags), + write_flags, num_threads, progress_func); }