From: Eric Biggers Date: Sat, 14 Feb 2015 22:03:28 +0000 (-0600) Subject: Reduce unnecessary copying during chunk compression X-Git-Tag: v1.8.0~18 X-Git-Url: https://wimlib.net/git/?p=wimlib;a=commitdiff_plain;h=d8e380e8314cdb592149a651a19690d102a1865b Reduce unnecessary copying during chunk compression --- diff --git a/include/wimlib/chunk_compressor.h b/include/wimlib/chunk_compressor.h index f21a0c6a..265e1362 100644 --- a/include/wimlib/chunk_compressor.h +++ b/include/wimlib/chunk_compressor.h @@ -23,16 +23,20 @@ struct chunk_compressor { /* Free the chunk compressor. */ void (*destroy)(struct chunk_compressor *); - /* Submit a chunk of uncompressed data for compression. + /* Try to borrow a buffer into which the uncompressed data for the next + * chunk should be prepared. * - * The chunk must have size greater than 0 and less than or equal to - * @out_chunk_size. + * Only one buffer can be borrowed at a time. * - * The return value is %true if the chunk was successfully submitted, or - * %false if the chunk compressor does not have space for the chunk at - * the present time. In the latter case, get_chunk() must be called to - * retrieve a compressed chunk before trying again. */ - bool (*submit_chunk)(struct chunk_compressor *, const void *, u32); + * Returns a pointer to the buffer, or NULL if no buffer is available. + * If no buffer is available, you must call ->get_compression_result() + * to retrieve a compressed chunk before trying again. */ + void *(*get_chunk_buffer)(struct chunk_compressor *); + + /* Signals to the chunk compressor that the buffer which was loaned out + * from ->get_chunk_buffer() has finished being filled and contains the + * specified number of bytes of uncompressed data. */ + void (*signal_chunk_filled)(struct chunk_compressor *, u32); /* Get the next chunk of compressed data. * @@ -52,8 +56,8 @@ struct chunk_compressor { * The return value is %true if a chunk of compressed data was * successfully retrieved, or %false if there are no chunks currently * being compressed. */ - bool (*get_chunk)(struct chunk_compressor *, - const void **, u32 *, u32 *); + bool (*get_compression_result)(struct chunk_compressor *, + const void **, u32 *, u32 *); }; diff --git a/src/compress_parallel.c b/src/compress_parallel.c index acf4ea58..b3377ffc 100644 --- a/src/compress_parallel.c +++ b/src/compress_parallel.c @@ -329,21 +329,17 @@ submit_compression_msg(struct parallel_chunk_compressor *ctx) ctx->next_submit_msg = NULL; } -static bool -parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx, - const void *chunk, u32 size) +static void * +parallel_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx) { struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx; struct message *msg; - wimlib_assert(size > 0); - wimlib_assert(size <= ctx->base.out_chunk_size); - if (ctx->next_submit_msg) { msg = ctx->next_submit_msg; } else { if (list_empty(&ctx->available_msgs)) - return false; + return NULL; msg = list_entry(ctx->available_msgs.next, struct message, list); list_del(&msg->list); @@ -351,22 +347,33 @@ parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx, msg->num_filled_chunks = 0; } - memcpy(msg->uncompressed_chunks[msg->num_filled_chunks], chunk, size); - msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = size; + return msg->uncompressed_chunks[msg->num_filled_chunks]; +} + +static void +parallel_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize) +{ + struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx; + struct message *msg; + + wimlib_assert(usize > 0); + wimlib_assert(usize <= ctx->base.out_chunk_size); + wimlib_assert(ctx->next_submit_msg); + + msg = ctx->next_submit_msg; + msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = usize; if (++msg->num_filled_chunks == msg->num_alloc_chunks) submit_compression_msg(ctx); - return true; } static bool -parallel_chunk_compressor_get_chunk(struct chunk_compressor *_ctx, - const void **cdata_ret, u32 *csize_ret, - u32 *usize_ret) +parallel_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx, + const void **cdata_ret, u32 *csize_ret, + u32 *usize_ret) { struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx; struct message *msg; - if (ctx->next_submit_msg) submit_compression_msg(ctx); @@ -490,8 +497,9 @@ new_parallel_chunk_compressor(int out_ctype, u32 out_chunk_size, ctx->base.out_ctype = out_ctype; ctx->base.out_chunk_size = out_chunk_size; ctx->base.destroy = parallel_chunk_compressor_destroy; - ctx->base.submit_chunk = parallel_chunk_compressor_submit_chunk; - ctx->base.get_chunk = parallel_chunk_compressor_get_chunk; + ctx->base.get_chunk_buffer = parallel_chunk_compressor_get_chunk_buffer; + ctx->base.signal_chunk_filled = parallel_chunk_compressor_signal_chunk_filled; + ctx->base.get_compression_result = parallel_chunk_compressor_get_compression_result; ctx->num_thread_data = num_threads; diff --git a/src/compress_serial.c b/src/compress_serial.c index b5eb951d..35470ec5 100644 --- a/src/compress_serial.c +++ b/src/compress_serial.c @@ -37,7 +37,9 @@ struct serial_chunk_compressor { struct wimlib_compressor *compressor; u8 *udata; u8 *cdata; - u32 ulen; + u32 usize; + u8 *result_data; + u32 result_size; }; static void @@ -54,48 +56,52 @@ serial_chunk_compressor_destroy(struct chunk_compressor *_ctx) FREE(ctx); } -static bool -serial_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx, - const void *chunk, u32 size) +static void * +serial_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx) { struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx; - if (ctx->ulen != 0) - return false; + if (ctx->result_data) + return NULL; + return ctx->udata; +} - wimlib_assert(size > 0); - wimlib_assert(size <= ctx->base.out_chunk_size); +static void +serial_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize) +{ + struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx; + u32 csize; - memcpy(ctx->udata, chunk, size); - ctx->ulen = size; - return true; + wimlib_assert(usize > 0); + wimlib_assert(usize <= ctx->base.out_chunk_size); + + ctx->usize = usize; + csize = wimlib_compress(ctx->udata, usize, ctx->cdata, usize - 1, + ctx->compressor); + if (csize) { + ctx->result_data = ctx->cdata; + ctx->result_size = csize; + } else { + ctx->result_data = ctx->udata; + ctx->result_size = ctx->usize; + } } static bool -serial_chunk_compressor_get_chunk(struct chunk_compressor *_ctx, - const void **cdata_ret, u32 *csize_ret, - u32 *usize_ret) +serial_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx, + const void **cdata_ret, u32 *csize_ret, + u32 *usize_ret) { - struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx; - u32 clen; + struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor *)_ctx; - if (ctx->ulen == 0) + if (!ctx->result_data) return false; - clen = wimlib_compress(ctx->udata, ctx->ulen, - ctx->cdata, ctx->ulen - 1, - ctx->compressor); - - if (clen) { - *cdata_ret = ctx->cdata; - *csize_ret = clen; - } else { - *cdata_ret = ctx->udata; - *csize_ret = ctx->ulen; - } - *usize_ret = ctx->ulen; + *cdata_ret = ctx->result_data; + *csize_ret = ctx->result_size; + *usize_ret = ctx->usize; - ctx->ulen = 0; + ctx->result_data = NULL; return true; } @@ -116,8 +122,9 @@ new_serial_chunk_compressor(int out_ctype, u32 out_chunk_size, ctx->base.out_chunk_size = out_chunk_size; ctx->base.num_threads = 1; ctx->base.destroy = serial_chunk_compressor_destroy; - ctx->base.submit_chunk = serial_chunk_compressor_submit_chunk; - ctx->base.get_chunk = serial_chunk_compressor_get_chunk; + ctx->base.get_chunk_buffer = serial_chunk_compressor_get_chunk_buffer; + ctx->base.signal_chunk_filled = serial_chunk_compressor_signal_chunk_filled; + ctx->base.get_compression_result = serial_chunk_compressor_get_compression_result; ret = wimlib_create_compressor(out_ctype, out_chunk_size, 0, &ctx->compressor); @@ -126,11 +133,11 @@ new_serial_chunk_compressor(int out_ctype, u32 out_chunk_size, ctx->udata = MALLOC(out_chunk_size); ctx->cdata = MALLOC(out_chunk_size - 1); - ctx->ulen = 0; if (ctx->udata == NULL || ctx->cdata == NULL) { ret = WIMLIB_ERR_NOMEM; goto err; } + ctx->result_data = NULL; *compressor_ret = &ctx->base; return 0; diff --git a/src/solid.c b/src/solid.c index 4b791ea1..94901d0f 100644 --- a/src/solid.c +++ b/src/solid.c @@ -236,7 +236,6 @@ sort_stream_list_for_solid_compression(struct list_head *stream_list) write_streams_list), cmp_streams_by_solid_sort_name); -out_free_solid_sort_names: list_for_each_entry(lte, stream_list, write_streams_list) FREE(lte->solid_sort_name); out_free_lookup_table: diff --git a/src/write.c b/src/write.c index 8886f8f5..842ca9b1 100644 --- a/src/write.c +++ b/src/write.c @@ -6,7 +6,7 @@ */ /* - * Copyright (C) 2012, 2013, 2014 Eric Biggers + * Copyright (C) 2012, 2013, 2014, 2015 Eric Biggers * * This file is free software; you can redistribute it and/or modify it under * the terms of the GNU Lesser General Public License as published by the Free @@ -379,12 +379,13 @@ struct write_streams_ctx { * uncompressed. */ struct chunk_compressor *compressor; - /* Buffer for dividing the read data into chunks of size - * @out_chunk_size. */ - u8 *chunk_buf; + /* A buffer of size @out_chunk_size that has been loaned out from the + * chunk compressor and is currently being filled with the uncompressed + * data of the next chunk. */ + u8 *cur_chunk_buf; - /* Number of bytes in @chunk_buf that are currently filled. */ - size_t chunk_buf_filled; + /* Number of bytes in @cur_chunk_buf that are currently filled. */ + size_t cur_chunk_buf_filled; /* List of streams that currently have chunks being compressed. */ struct list_head pending_streams; @@ -1057,22 +1058,23 @@ write_error: } static int -submit_chunk_for_compression(struct write_streams_ctx *ctx, - const void *chunk, size_t size) +prepare_chunk_buffer(struct write_streams_ctx *ctx) { - /* While we are unable to submit the chunk for compression (due to too - * many chunks already outstanding), retrieve and write the next - * compressed chunk. */ - while (!ctx->compressor->submit_chunk(ctx->compressor, chunk, size)) { + /* While we are unable to get a new chunk buffer due to too many chunks + * already outstanding, retrieve and write the next compressed chunk. */ + while (!(ctx->cur_chunk_buf = + ctx->compressor->get_chunk_buffer(ctx->compressor))) + { const void *cchunk; u32 csize; u32 usize; bool bret; int ret; - bret = ctx->compressor->get_chunk(ctx->compressor, - &cchunk, &csize, &usize); - + bret = ctx->compressor->get_compression_result(ctx->compressor, + &cchunk, + &csize, + &usize); wimlib_assert(bret); ret = write_chunk(ctx, cchunk, csize, usize); @@ -1107,55 +1109,40 @@ write_stream_process_chunk(const void *chunk, size_t size, void *_ctx) chunkptr = chunk; chunkend = chunkptr + size; do { - const u8 *resized_chunk; size_t needed_chunk_size; + size_t bytes_consumed; + + if (!ctx->cur_chunk_buf) { + ret = prepare_chunk_buffer(ctx); + if (ret) + return ret; + } if (ctx->write_resource_flags & WRITE_RESOURCE_FLAG_SOLID) { needed_chunk_size = ctx->out_chunk_size; } else { - u64 res_bytes_remaining; - - res_bytes_remaining = ctx->cur_read_stream_size - - ctx->cur_read_stream_offset; needed_chunk_size = min(ctx->out_chunk_size, - ctx->chunk_buf_filled + - res_bytes_remaining); + ctx->cur_chunk_buf_filled + + (ctx->cur_read_stream_size - + ctx->cur_read_stream_offset)); } - if (ctx->chunk_buf_filled == 0 && - chunkend - chunkptr >= needed_chunk_size) - { - /* No intermediate buffering needed. */ - resized_chunk = chunkptr; - chunkptr += needed_chunk_size; - ctx->cur_read_stream_offset += needed_chunk_size; - } else { - /* Intermediate buffering needed. */ - size_t bytes_consumed; - - bytes_consumed = min(chunkend - chunkptr, - needed_chunk_size - ctx->chunk_buf_filled); + bytes_consumed = min(chunkend - chunkptr, + needed_chunk_size - ctx->cur_chunk_buf_filled); - memcpy(&ctx->chunk_buf[ctx->chunk_buf_filled], - chunkptr, bytes_consumed); + memcpy(&ctx->cur_chunk_buf[ctx->cur_chunk_buf_filled], + chunkptr, bytes_consumed); - chunkptr += bytes_consumed; - ctx->cur_read_stream_offset += bytes_consumed; - ctx->chunk_buf_filled += bytes_consumed; - if (ctx->chunk_buf_filled == needed_chunk_size) { - resized_chunk = ctx->chunk_buf; - ctx->chunk_buf_filled = 0; - } else { - break; - } + chunkptr += bytes_consumed; + ctx->cur_read_stream_offset += bytes_consumed; + ctx->cur_chunk_buf_filled += bytes_consumed; + if (ctx->cur_chunk_buf_filled == needed_chunk_size) { + ctx->compressor->signal_chunk_filled(ctx->compressor, + ctx->cur_chunk_buf_filled); + ctx->cur_chunk_buf = NULL; + ctx->cur_chunk_buf_filled = 0; } - - ret = submit_chunk_for_compression(ctx, resized_chunk, - needed_chunk_size); - if (ret) - return ret; - } while (chunkptr != chunkend); return 0; } @@ -1374,14 +1361,14 @@ finish_remaining_chunks(struct write_streams_ctx *ctx) if (ctx->compressor == NULL) return 0; - if (ctx->chunk_buf_filled != 0) { - ret = submit_chunk_for_compression(ctx, ctx->chunk_buf, - ctx->chunk_buf_filled); - if (ret) - return ret; + if (ctx->cur_chunk_buf_filled != 0) { + ctx->compressor->signal_chunk_filled(ctx->compressor, + ctx->cur_chunk_buf_filled); } - while (ctx->compressor->get_chunk(ctx->compressor, &cdata, &csize, &usize)) { + while (ctx->compressor->get_compression_result(ctx->compressor, &cdata, + &csize, &usize)) + { ret = write_chunk(ctx, cdata, csize, usize); if (ret) return ret; @@ -1601,20 +1588,6 @@ write_stream_list(struct list_head *stream_list, WARNING("Failed to sort streams for solid compression. Continuing anyways."); } - if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE) { - wimlib_assert(out_chunk_size != 0); - if (out_chunk_size <= STACK_MAX) { - ctx.chunk_buf = alloca(out_chunk_size); - } else { - ctx.chunk_buf = MALLOC(out_chunk_size); - if (ctx.chunk_buf == NULL) { - ret = WIMLIB_ERR_NOMEM; - goto out_destroy_context; - } - } - } - ctx.chunk_buf_filled = 0; - ctx.progress_data.progfunc = progfunc; ctx.progress_data.progctx = progctx; @@ -1753,8 +1726,6 @@ out_write_raw_copy_resources: &ctx.progress_data); out_destroy_context: - if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE && out_chunk_size > STACK_MAX) - FREE(ctx.chunk_buf); FREE(ctx.chunk_csizes); if (ctx.compressor) ctx.compressor->destroy(ctx.compressor);