From ab82c2b1642c5c59d66a9b14e7fcdcee06dc573a Mon Sep 17 00:00:00 2001 From: Eric Biggers Date: Mon, 8 Apr 2013 00:07:55 -0500 Subject: [PATCH] Write uncompressible streams uncompressed in multithreaded code path --- src/write.c | 98 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 34 deletions(-) diff --git a/src/write.c b/src/write.c index 37e652a2..e0d898ee 100644 --- a/src/write.c +++ b/src/write.c @@ -245,6 +245,21 @@ finish_wim_resource_chunk_tab(struct chunk_table *chunk_tab, return 0; } +static int +fflush_and_ftruncate(FILE *out_fp, off_t offset) +{ + if (fseeko(out_fp, offset, SEEK_SET) || + fflush(out_fp) || + ftruncate(fileno(out_fp), offset)) + { + ERROR_WITH_ERRNO("Failed to flush and/or truncate " + "output WIM file"); + return WIMLIB_ERR_WRITE; + } else { + return 0; + } +} + static int finalize_and_check_sha1(SHA_CTX *sha_ctx, struct wim_lookup_table_entry *lte) { @@ -414,19 +429,12 @@ try_write_again: if (new_size >= wim_resource_size(lte)) { /* Oops! We compressed the resource to larger than the original * size. Write the resource uncompressed instead. */ - if (fseeko(out_fp, offset, SEEK_SET) || - fflush(out_fp) || - ftruncate(fileno(out_fp), - offset + wim_resource_size(lte))) - { - ERROR_WITH_ERRNO("Failed to flush and/or truncate " - "output WIM file"); - ret = WIMLIB_ERR_WRITE; - goto out_free_chunk_tab; - } DEBUG("Compressed %"PRIu64" => %"PRIu64" bytes; " "writing uncompressed instead", wim_resource_size(lte), new_size); + ret = fflush_and_ftruncate(out_fp, offset); + if (ret) + goto out_free_chunk_tab; write_ctx.compress = NULL; write_ctx.doing_sha = false; out_ctype = WIMLIB_COMPRESSION_TYPE_NONE; @@ -747,6 +755,7 @@ write_stream_list_serial(struct list_head *stream_list, wimlib_progress_func_t progress_func, union wimlib_progress_info *progress) { + DEBUG("Writing stream list (serial version)"); progress->write_streams.num_threads = 1; if (progress_func) progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress); @@ -961,6 +970,8 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) msg->begin_chunk + msg->num_chunks == ctx->cur_chunk_tab->num_chunks) { u64 res_csize; + off_t offset; + ret = finish_wim_resource_chunk_tab(ctx->cur_chunk_tab, ctx->out_fp, &res_csize); @@ -968,21 +979,32 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) return ret; list_del(&cur_lte->being_compressed_list); -#if 0 + + /* Grab the offset of this stream in the output file + * from the chunk table before we free it. */ + offset = ctx->cur_chunk_tab->file_offset; + + FREE(ctx->cur_chunk_tab); + ctx->cur_chunk_tab = NULL; + if (res_csize >= wim_resource_size(cur_lte)) { /* Oops! We compressed the resource to * larger than the original size. Write * the resource uncompressed instead. */ - ret = write_uncompressed_resource_and_truncate( - cur_lte, - ctx->out_fp, - ctx->cur_chunk_tab->file_offset, - &cur_lte->output_resource_entry); + DEBUG("Compressed %"PRIu64" => %"PRIu64" bytes; " + "writing uncompressed instead", + wim_resource_size(cur_lte), res_csize); + ret = fflush_and_ftruncate(ctx->out_fp, offset); if (ret) - goto out; - } else -#endif - { + return ret; + ret = write_wim_resource(cur_lte, + ctx->out_fp, + WIMLIB_COMPRESSION_TYPE_NONE, + &cur_lte->output_resource_entry, + ctx->write_resource_flags); + if (ret) + return ret; + } else { cur_lte->output_resource_entry.size = res_csize; @@ -990,16 +1012,16 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) cur_lte->resource_entry.original_size; cur_lte->output_resource_entry.offset = - ctx->cur_chunk_tab->file_offset; + offset; cur_lte->output_resource_entry.flags = cur_lte->resource_entry.flags | WIM_RESHDR_FLAG_COMPRESSED; } - do_write_streams_progress(ctx->progress, ctx->progress_func, + + do_write_streams_progress(ctx->progress, + ctx->progress_func, wim_resource_size(cur_lte)); - FREE(ctx->cur_chunk_tab); - ctx->cur_chunk_tab = NULL; /* Since we just finished writing a stream, write any * streams that have been added to the serial_streams @@ -1007,15 +1029,17 @@ receive_compressed_chunks(struct main_writer_thread_ctx *ctx) * resources that don't need to be compressed because * the desired compression type is the same as the * previous compression type). */ - ret = do_write_stream_list_serial(&ctx->serial_streams, - ctx->lookup_table, - ctx->out_fp, - ctx->out_ctype, - ctx->write_resource_flags, - ctx->progress_func, - ctx->progress); - if (ret) - return ret; + if (!list_empty(&ctx->serial_streams)) { + ret = do_write_stream_list_serial(&ctx->serial_streams, + ctx->lookup_table, + ctx->out_fp, + ctx->out_ctype, + ctx->write_resource_flags, + ctx->progress_func, + ctx->progress); + if (ret) + return ret; + } /* Advance to the next stream to write. */ if (list_empty(&ctx->outstanding_streams)) { @@ -1055,7 +1079,7 @@ main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx) * a fixed number of them, proportional to the number of * threads) have been sent off to the compressor threads, so we * receive messages from the compressor threads containing - * compressed chunks of data. + * compressed chunks of data. * * We may need to receive multiple messages before one is * actually available to use because messages received that are @@ -1220,11 +1244,16 @@ write_stream_list_parallel(struct list_head *stream_list, if (nthreads < 1 || nthreads > UINT_MAX) { WARNING("Could not determine number of processors! Assuming 1"); goto out_serial; + } else if (nthreads == 1) { + goto out_serial_quiet; } else { num_threads = nthreads; } } + DEBUG("Writing stream list (parallel version, num_threads=%u)", + num_threads); + progress->write_streams.num_threads = num_threads; static const size_t MESSAGES_PER_THREAD = 2; @@ -1317,6 +1346,7 @@ out_destroy_res_to_compress_queue: return ret; out_serial: WARNING("Falling back to single-threaded compression"); +out_serial_quiet: return write_stream_list_serial(stream_list, lookup_table, out_fp, -- 2.43.0