Reduce unnecessary copying during chunk compression
authorEric Biggers <ebiggers3@gmail.com>
Sat, 14 Feb 2015 22:03:28 +0000 (16:03 -0600)
committerEric Biggers <ebiggers3@gmail.com>
Sun, 15 Feb 2015 00:00:11 +0000 (18:00 -0600)
include/wimlib/chunk_compressor.h
src/compress_parallel.c
src/compress_serial.c
src/solid.c
src/write.c

index f21a0c6a7bd9fcf5d7e755a94223291cd4db3557..265e1362438a39c9238fe9d67657d65cf2d59f27 100644 (file)
@@ -23,16 +23,20 @@ struct chunk_compressor {
        /* Free the chunk compressor.  */
        void (*destroy)(struct chunk_compressor *);
 
-       /* Submit a chunk of uncompressed data for compression.
+       /* Try to borrow a buffer into which the uncompressed data for the next
+        * chunk should be prepared.
         *
-        * The chunk must have size greater than 0 and less than or equal to
-        * @out_chunk_size.
+        * Only one buffer can be borrowed at a time.
         *
-        * The return value is %true if the chunk was successfully submitted, or
-        * %false if the chunk compressor does not have space for the chunk at
-        * the present time.  In the latter case, get_chunk() must be called to
-        * retrieve a compressed chunk before trying again.  */
-       bool (*submit_chunk)(struct chunk_compressor *, const void *, u32);
+        * Returns a pointer to the buffer, or NULL if no buffer is available.
+        * If no buffer is available, you must call ->get_compression_result()
+        * to retrieve a compressed chunk before trying again.  */
+       void *(*get_chunk_buffer)(struct chunk_compressor *);
+
+       /* Signals to the chunk compressor that the buffer which was loaned out
+        * from ->get_chunk_buffer() has finished being filled and contains the
+        * specified number of bytes of uncompressed data.  */
+       void (*signal_chunk_filled)(struct chunk_compressor *, u32);
 
        /* Get the next chunk of compressed data.
         *
@@ -52,8 +56,8 @@ struct chunk_compressor {
         * The return value is %true if a chunk of compressed data was
         * successfully retrieved, or %false if there are no chunks currently
         * being compressed.  */
-       bool (*get_chunk)(struct chunk_compressor *,
-                         const void **, u32 *, u32 *);
+       bool (*get_compression_result)(struct chunk_compressor *,
+                                      const void **, u32 *, u32 *);
 };
 
 
index acf4ea58afc24b4299f7848d22301486c6be5bbe..b3377ffc985ca636ce6d1746b7c939ec7b349524 100644 (file)
@@ -329,21 +329,17 @@ submit_compression_msg(struct parallel_chunk_compressor *ctx)
        ctx->next_submit_msg = NULL;
 }
 
-static bool
-parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
-                                      const void *chunk, u32 size)
+static void *
+parallel_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx)
 {
        struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
        struct message *msg;
 
-       wimlib_assert(size > 0);
-       wimlib_assert(size <= ctx->base.out_chunk_size);
-
        if (ctx->next_submit_msg) {
                msg = ctx->next_submit_msg;
        } else {
                if (list_empty(&ctx->available_msgs))
-                       return false;
+                       return NULL;
 
                msg = list_entry(ctx->available_msgs.next, struct message, list);
                list_del(&msg->list);
@@ -351,22 +347,33 @@ parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
                msg->num_filled_chunks = 0;
        }
 
-       memcpy(msg->uncompressed_chunks[msg->num_filled_chunks], chunk, size);
-       msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = size;
+       return msg->uncompressed_chunks[msg->num_filled_chunks];
+}
+
+static void
+parallel_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize)
+{
+       struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
+       struct message *msg;
+
+       wimlib_assert(usize > 0);
+       wimlib_assert(usize <= ctx->base.out_chunk_size);
+       wimlib_assert(ctx->next_submit_msg);
+
+       msg = ctx->next_submit_msg;
+       msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = usize;
        if (++msg->num_filled_chunks == msg->num_alloc_chunks)
                submit_compression_msg(ctx);
-       return true;
 }
 
 static bool
-parallel_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
-                                   const void **cdata_ret, u32 *csize_ret,
-                                   u32 *usize_ret)
+parallel_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx,
+                                                const void **cdata_ret, u32 *csize_ret,
+                                                u32 *usize_ret)
 {
        struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
        struct message *msg;
 
-
        if (ctx->next_submit_msg)
                submit_compression_msg(ctx);
 
@@ -490,8 +497,9 @@ new_parallel_chunk_compressor(int out_ctype, u32 out_chunk_size,
        ctx->base.out_ctype = out_ctype;
        ctx->base.out_chunk_size = out_chunk_size;
        ctx->base.destroy = parallel_chunk_compressor_destroy;
-       ctx->base.submit_chunk = parallel_chunk_compressor_submit_chunk;
-       ctx->base.get_chunk = parallel_chunk_compressor_get_chunk;
+       ctx->base.get_chunk_buffer = parallel_chunk_compressor_get_chunk_buffer;
+       ctx->base.signal_chunk_filled = parallel_chunk_compressor_signal_chunk_filled;
+       ctx->base.get_compression_result = parallel_chunk_compressor_get_compression_result;
 
        ctx->num_thread_data = num_threads;
 
index b5eb951d8e0391a014311faa44481e3fede60bb5..35470ec531a6ae40bf72d43b91f1a9315fbe9658 100644 (file)
@@ -37,7 +37,9 @@ struct serial_chunk_compressor {
        struct wimlib_compressor *compressor;
        u8 *udata;
        u8 *cdata;
-       u32 ulen;
+       u32 usize;
+       u8 *result_data;
+       u32 result_size;
 };
 
 static void
@@ -54,48 +56,52 @@ serial_chunk_compressor_destroy(struct chunk_compressor *_ctx)
        FREE(ctx);
 }
 
-static bool
-serial_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
-                                    const void *chunk, u32 size)
+static void *
+serial_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx)
 {
        struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
 
-       if (ctx->ulen != 0)
-               return false;
+       if (ctx->result_data)
+               return NULL;
+       return ctx->udata;
+}
 
-       wimlib_assert(size > 0);
-       wimlib_assert(size <= ctx->base.out_chunk_size);
+static void
+serial_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize)
+{
+       struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
+       u32 csize;
 
-       memcpy(ctx->udata, chunk, size);
-       ctx->ulen = size;
-       return true;
+       wimlib_assert(usize > 0);
+       wimlib_assert(usize <= ctx->base.out_chunk_size);
+
+       ctx->usize = usize;
+       csize = wimlib_compress(ctx->udata, usize, ctx->cdata, usize - 1,
+                               ctx->compressor);
+       if (csize) {
+               ctx->result_data = ctx->cdata;
+               ctx->result_size = csize;
+       } else {
+               ctx->result_data = ctx->udata;
+               ctx->result_size = ctx->usize;
+       }
 }
 
 static bool
-serial_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
-                                 const void **cdata_ret, u32 *csize_ret,
-                                 u32 *usize_ret)
+serial_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx,
+                                              const void **cdata_ret, u32 *csize_ret,
+                                              u32 *usize_ret)
 {
-       struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
-       u32 clen;
+       struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor *)_ctx;
 
-       if (ctx->ulen == 0)
+       if (!ctx->result_data)
                return false;
 
-       clen = wimlib_compress(ctx->udata, ctx->ulen,
-                              ctx->cdata, ctx->ulen - 1,
-                              ctx->compressor);
-
-       if (clen) {
-               *cdata_ret = ctx->cdata;
-               *csize_ret = clen;
-       } else {
-               *cdata_ret = ctx->udata;
-               *csize_ret = ctx->ulen;
-       }
-       *usize_ret = ctx->ulen;
+       *cdata_ret = ctx->result_data;
+       *csize_ret = ctx->result_size;
+       *usize_ret = ctx->usize;
 
-       ctx->ulen = 0;
+       ctx->result_data = NULL;
        return true;
 }
 
@@ -116,8 +122,9 @@ new_serial_chunk_compressor(int out_ctype, u32 out_chunk_size,
        ctx->base.out_chunk_size = out_chunk_size;
        ctx->base.num_threads = 1;
        ctx->base.destroy = serial_chunk_compressor_destroy;
-       ctx->base.submit_chunk = serial_chunk_compressor_submit_chunk;
-       ctx->base.get_chunk = serial_chunk_compressor_get_chunk;
+       ctx->base.get_chunk_buffer = serial_chunk_compressor_get_chunk_buffer;
+       ctx->base.signal_chunk_filled = serial_chunk_compressor_signal_chunk_filled;
+       ctx->base.get_compression_result = serial_chunk_compressor_get_compression_result;
 
        ret = wimlib_create_compressor(out_ctype, out_chunk_size,
                                       0, &ctx->compressor);
@@ -126,11 +133,11 @@ new_serial_chunk_compressor(int out_ctype, u32 out_chunk_size,
 
        ctx->udata = MALLOC(out_chunk_size);
        ctx->cdata = MALLOC(out_chunk_size - 1);
-       ctx->ulen = 0;
        if (ctx->udata == NULL || ctx->cdata == NULL) {
                ret = WIMLIB_ERR_NOMEM;
                goto err;
        }
+       ctx->result_data = NULL;
 
        *compressor_ret = &ctx->base;
        return 0;
index 4b791ea163c3a16ffdf007993d0061f516126f37..94901d0f319922263d149b2a55874a7e4b8a6e8a 100644 (file)
@@ -236,7 +236,6 @@ sort_stream_list_for_solid_compression(struct list_head *stream_list)
                                        write_streams_list),
                               cmp_streams_by_solid_sort_name);
 
-out_free_solid_sort_names:
        list_for_each_entry(lte, stream_list, write_streams_list)
                FREE(lte->solid_sort_name);
 out_free_lookup_table:
index 8886f8f5a61ae401d0aa8c21082926f689b4833c..842ca9b111dd38f0c9458be29af2858a205a538b 100644 (file)
@@ -6,7 +6,7 @@
  */
 
 /*
- * Copyright (C) 2012, 2013, 2014 Eric Biggers
+ * Copyright (C) 2012, 2013, 2014, 2015 Eric Biggers
  *
  * This file is free software; you can redistribute it and/or modify it under
  * the terms of the GNU Lesser General Public License as published by the Free
@@ -379,12 +379,13 @@ struct write_streams_ctx {
         * uncompressed.  */
        struct chunk_compressor *compressor;
 
-       /* Buffer for dividing the read data into chunks of size
-        * @out_chunk_size.  */
-       u8 *chunk_buf;
+       /* A buffer of size @out_chunk_size that has been loaned out from the
+        * chunk compressor and is currently being filled with the uncompressed
+        * data of the next chunk.  */
+       u8 *cur_chunk_buf;
 
-       /* Number of bytes in @chunk_buf that are currently filled.  */
-       size_t chunk_buf_filled;
+       /* Number of bytes in @cur_chunk_buf that are currently filled.  */
+       size_t cur_chunk_buf_filled;
 
        /* List of streams that currently have chunks being compressed.  */
        struct list_head pending_streams;
@@ -1057,22 +1058,23 @@ write_error:
 }
 
 static int
-submit_chunk_for_compression(struct write_streams_ctx *ctx,
-                            const void *chunk, size_t size)
+prepare_chunk_buffer(struct write_streams_ctx *ctx)
 {
-       /* While we are unable to submit the chunk for compression (due to too
-        * many chunks already outstanding), retrieve and write the next
-        * compressed chunk.  */
-       while (!ctx->compressor->submit_chunk(ctx->compressor, chunk, size)) {
+       /* While we are unable to get a new chunk buffer due to too many chunks
+        * already outstanding, retrieve and write the next compressed chunk. */
+       while (!(ctx->cur_chunk_buf =
+                ctx->compressor->get_chunk_buffer(ctx->compressor)))
+       {
                const void *cchunk;
                u32 csize;
                u32 usize;
                bool bret;
                int ret;
 
-               bret = ctx->compressor->get_chunk(ctx->compressor,
-                                                 &cchunk, &csize, &usize);
-
+               bret = ctx->compressor->get_compression_result(ctx->compressor,
+                                                              &cchunk,
+                                                              &csize,
+                                                              &usize);
                wimlib_assert(bret);
 
                ret = write_chunk(ctx, cchunk, csize, usize);
@@ -1107,55 +1109,40 @@ write_stream_process_chunk(const void *chunk, size_t size, void *_ctx)
        chunkptr = chunk;
        chunkend = chunkptr + size;
        do {
-               const u8 *resized_chunk;
                size_t needed_chunk_size;
+               size_t bytes_consumed;
+
+               if (!ctx->cur_chunk_buf) {
+                       ret = prepare_chunk_buffer(ctx);
+                       if (ret)
+                               return ret;
+               }
 
                if (ctx->write_resource_flags & WRITE_RESOURCE_FLAG_SOLID) {
                        needed_chunk_size = ctx->out_chunk_size;
                } else {
-                       u64 res_bytes_remaining;
-
-                       res_bytes_remaining = ctx->cur_read_stream_size -
-                                             ctx->cur_read_stream_offset;
                        needed_chunk_size = min(ctx->out_chunk_size,
-                                               ctx->chunk_buf_filled +
-                                                       res_bytes_remaining);
+                                               ctx->cur_chunk_buf_filled +
+                                                       (ctx->cur_read_stream_size -
+                                                        ctx->cur_read_stream_offset));
                }
 
-               if (ctx->chunk_buf_filled == 0 &&
-                   chunkend - chunkptr >= needed_chunk_size)
-               {
-                       /* No intermediate buffering needed.  */
-                       resized_chunk = chunkptr;
-                       chunkptr += needed_chunk_size;
-                       ctx->cur_read_stream_offset += needed_chunk_size;
-               } else {
-                       /* Intermediate buffering needed.  */
-                       size_t bytes_consumed;
-
-                       bytes_consumed = min(chunkend - chunkptr,
-                                            needed_chunk_size - ctx->chunk_buf_filled);
+               bytes_consumed = min(chunkend - chunkptr,
+                                    needed_chunk_size - ctx->cur_chunk_buf_filled);
 
-                       memcpy(&ctx->chunk_buf[ctx->chunk_buf_filled],
-                              chunkptr, bytes_consumed);
+               memcpy(&ctx->cur_chunk_buf[ctx->cur_chunk_buf_filled],
+                      chunkptr, bytes_consumed);
 
-                       chunkptr += bytes_consumed;
-                       ctx->cur_read_stream_offset += bytes_consumed;
-                       ctx->chunk_buf_filled += bytes_consumed;
-                       if (ctx->chunk_buf_filled == needed_chunk_size) {
-                               resized_chunk = ctx->chunk_buf;
-                               ctx->chunk_buf_filled = 0;
-                       } else {
-                               break;
-                       }
+               chunkptr += bytes_consumed;
+               ctx->cur_read_stream_offset += bytes_consumed;
+               ctx->cur_chunk_buf_filled += bytes_consumed;
 
+               if (ctx->cur_chunk_buf_filled == needed_chunk_size) {
+                       ctx->compressor->signal_chunk_filled(ctx->compressor,
+                                                            ctx->cur_chunk_buf_filled);
+                       ctx->cur_chunk_buf = NULL;
+                       ctx->cur_chunk_buf_filled = 0;
                }
-
-               ret = submit_chunk_for_compression(ctx, resized_chunk,
-                                                  needed_chunk_size);
-               if (ret)
-                       return ret;
-
        } while (chunkptr != chunkend);
        return 0;
 }
@@ -1374,14 +1361,14 @@ finish_remaining_chunks(struct write_streams_ctx *ctx)
        if (ctx->compressor == NULL)
                return 0;
 
-       if (ctx->chunk_buf_filled != 0) {
-               ret = submit_chunk_for_compression(ctx, ctx->chunk_buf,
-                                                  ctx->chunk_buf_filled);
-               if (ret)
-                       return ret;
+       if (ctx->cur_chunk_buf_filled != 0) {
+               ctx->compressor->signal_chunk_filled(ctx->compressor,
+                                                    ctx->cur_chunk_buf_filled);
        }
 
-       while (ctx->compressor->get_chunk(ctx->compressor, &cdata, &csize, &usize)) {
+       while (ctx->compressor->get_compression_result(ctx->compressor, &cdata,
+                                                      &csize, &usize))
+       {
                ret = write_chunk(ctx, cdata, csize, usize);
                if (ret)
                        return ret;
@@ -1601,20 +1588,6 @@ write_stream_list(struct list_head *stream_list,
                        WARNING("Failed to sort streams for solid compression. Continuing anyways.");
        }
 
-       if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE) {
-               wimlib_assert(out_chunk_size != 0);
-               if (out_chunk_size <= STACK_MAX) {
-                       ctx.chunk_buf = alloca(out_chunk_size);
-               } else {
-                       ctx.chunk_buf = MALLOC(out_chunk_size);
-                       if (ctx.chunk_buf == NULL) {
-                               ret = WIMLIB_ERR_NOMEM;
-                               goto out_destroy_context;
-                       }
-               }
-       }
-       ctx.chunk_buf_filled = 0;
-
        ctx.progress_data.progfunc = progfunc;
        ctx.progress_data.progctx = progctx;
 
@@ -1753,8 +1726,6 @@ out_write_raw_copy_resources:
                                       &ctx.progress_data);
 
 out_destroy_context:
-       if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE && out_chunk_size > STACK_MAX)
-               FREE(ctx.chunk_buf);
        FREE(ctx.chunk_csizes);
        if (ctx.compressor)
                ctx.compressor->destroy(ctx.compressor);