/* Free the chunk compressor. */
void (*destroy)(struct chunk_compressor *);
- /* Submit a chunk of uncompressed data for compression.
+ /* Try to borrow a buffer into which the uncompressed data for the next
+ * chunk should be prepared.
*
- * The chunk must have size greater than 0 and less than or equal to
- * @out_chunk_size.
+ * Only one buffer can be borrowed at a time.
*
- * The return value is %true if the chunk was successfully submitted, or
- * %false if the chunk compressor does not have space for the chunk at
- * the present time. In the latter case, get_chunk() must be called to
- * retrieve a compressed chunk before trying again. */
- bool (*submit_chunk)(struct chunk_compressor *, const void *, u32);
+ * Returns a pointer to the buffer, or NULL if no buffer is available.
+ * If no buffer is available, you must call ->get_compression_result()
+ * to retrieve a compressed chunk before trying again. */
+ void *(*get_chunk_buffer)(struct chunk_compressor *);
+
+ /* Signals to the chunk compressor that the buffer which was loaned out
+ * from ->get_chunk_buffer() has finished being filled and contains the
+ * specified number of bytes of uncompressed data. */
+ void (*signal_chunk_filled)(struct chunk_compressor *, u32);
/* Get the next chunk of compressed data.
*
* The return value is %true if a chunk of compressed data was
* successfully retrieved, or %false if there are no chunks currently
* being compressed. */
- bool (*get_chunk)(struct chunk_compressor *,
- const void **, u32 *, u32 *);
+ bool (*get_compression_result)(struct chunk_compressor *,
+ const void **, u32 *, u32 *);
};
ctx->next_submit_msg = NULL;
}
-static bool
-parallel_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
- const void *chunk, u32 size)
+static void *
+parallel_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
- wimlib_assert(size > 0);
- wimlib_assert(size <= ctx->base.out_chunk_size);
-
if (ctx->next_submit_msg) {
msg = ctx->next_submit_msg;
} else {
if (list_empty(&ctx->available_msgs))
- return false;
+ return NULL;
msg = list_entry(ctx->available_msgs.next, struct message, list);
list_del(&msg->list);
msg->num_filled_chunks = 0;
}
- memcpy(msg->uncompressed_chunks[msg->num_filled_chunks], chunk, size);
- msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = size;
+ return msg->uncompressed_chunks[msg->num_filled_chunks];
+}
+
+static void
+parallel_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize)
+{
+ struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
+ struct message *msg;
+
+ wimlib_assert(usize > 0);
+ wimlib_assert(usize <= ctx->base.out_chunk_size);
+ wimlib_assert(ctx->next_submit_msg);
+
+ msg = ctx->next_submit_msg;
+ msg->uncompressed_chunk_sizes[msg->num_filled_chunks] = usize;
if (++msg->num_filled_chunks == msg->num_alloc_chunks)
submit_compression_msg(ctx);
- return true;
}
static bool
-parallel_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
- const void **cdata_ret, u32 *csize_ret,
- u32 *usize_ret)
+parallel_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx,
+ const void **cdata_ret, u32 *csize_ret,
+ u32 *usize_ret)
{
struct parallel_chunk_compressor *ctx = (struct parallel_chunk_compressor *)_ctx;
struct message *msg;
-
if (ctx->next_submit_msg)
submit_compression_msg(ctx);
ctx->base.out_ctype = out_ctype;
ctx->base.out_chunk_size = out_chunk_size;
ctx->base.destroy = parallel_chunk_compressor_destroy;
- ctx->base.submit_chunk = parallel_chunk_compressor_submit_chunk;
- ctx->base.get_chunk = parallel_chunk_compressor_get_chunk;
+ ctx->base.get_chunk_buffer = parallel_chunk_compressor_get_chunk_buffer;
+ ctx->base.signal_chunk_filled = parallel_chunk_compressor_signal_chunk_filled;
+ ctx->base.get_compression_result = parallel_chunk_compressor_get_compression_result;
ctx->num_thread_data = num_threads;
struct wimlib_compressor *compressor;
u8 *udata;
u8 *cdata;
- u32 ulen;
+ u32 usize;
+ u8 *result_data;
+ u32 result_size;
};
static void
FREE(ctx);
}
-static bool
-serial_chunk_compressor_submit_chunk(struct chunk_compressor *_ctx,
- const void *chunk, u32 size)
+static void *
+serial_chunk_compressor_get_chunk_buffer(struct chunk_compressor *_ctx)
{
struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
- if (ctx->ulen != 0)
- return false;
+ if (ctx->result_data)
+ return NULL;
+ return ctx->udata;
+}
- wimlib_assert(size > 0);
- wimlib_assert(size <= ctx->base.out_chunk_size);
+static void
+serial_chunk_compressor_signal_chunk_filled(struct chunk_compressor *_ctx, u32 usize)
+{
+ struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
+ u32 csize;
- memcpy(ctx->udata, chunk, size);
- ctx->ulen = size;
- return true;
+ wimlib_assert(usize > 0);
+ wimlib_assert(usize <= ctx->base.out_chunk_size);
+
+ ctx->usize = usize;
+ csize = wimlib_compress(ctx->udata, usize, ctx->cdata, usize - 1,
+ ctx->compressor);
+ if (csize) {
+ ctx->result_data = ctx->cdata;
+ ctx->result_size = csize;
+ } else {
+ ctx->result_data = ctx->udata;
+ ctx->result_size = ctx->usize;
+ }
}
static bool
-serial_chunk_compressor_get_chunk(struct chunk_compressor *_ctx,
- const void **cdata_ret, u32 *csize_ret,
- u32 *usize_ret)
+serial_chunk_compressor_get_compression_result(struct chunk_compressor *_ctx,
+ const void **cdata_ret, u32 *csize_ret,
+ u32 *usize_ret)
{
- struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor*)_ctx;
- u32 clen;
+ struct serial_chunk_compressor *ctx = (struct serial_chunk_compressor *)_ctx;
- if (ctx->ulen == 0)
+ if (!ctx->result_data)
return false;
- clen = wimlib_compress(ctx->udata, ctx->ulen,
- ctx->cdata, ctx->ulen - 1,
- ctx->compressor);
-
- if (clen) {
- *cdata_ret = ctx->cdata;
- *csize_ret = clen;
- } else {
- *cdata_ret = ctx->udata;
- *csize_ret = ctx->ulen;
- }
- *usize_ret = ctx->ulen;
+ *cdata_ret = ctx->result_data;
+ *csize_ret = ctx->result_size;
+ *usize_ret = ctx->usize;
- ctx->ulen = 0;
+ ctx->result_data = NULL;
return true;
}
ctx->base.out_chunk_size = out_chunk_size;
ctx->base.num_threads = 1;
ctx->base.destroy = serial_chunk_compressor_destroy;
- ctx->base.submit_chunk = serial_chunk_compressor_submit_chunk;
- ctx->base.get_chunk = serial_chunk_compressor_get_chunk;
+ ctx->base.get_chunk_buffer = serial_chunk_compressor_get_chunk_buffer;
+ ctx->base.signal_chunk_filled = serial_chunk_compressor_signal_chunk_filled;
+ ctx->base.get_compression_result = serial_chunk_compressor_get_compression_result;
ret = wimlib_create_compressor(out_ctype, out_chunk_size,
0, &ctx->compressor);
ctx->udata = MALLOC(out_chunk_size);
ctx->cdata = MALLOC(out_chunk_size - 1);
- ctx->ulen = 0;
if (ctx->udata == NULL || ctx->cdata == NULL) {
ret = WIMLIB_ERR_NOMEM;
goto err;
}
+ ctx->result_data = NULL;
*compressor_ret = &ctx->base;
return 0;
write_streams_list),
cmp_streams_by_solid_sort_name);
-out_free_solid_sort_names:
list_for_each_entry(lte, stream_list, write_streams_list)
FREE(lte->solid_sort_name);
out_free_lookup_table:
*/
/*
- * Copyright (C) 2012, 2013, 2014 Eric Biggers
+ * Copyright (C) 2012, 2013, 2014, 2015 Eric Biggers
*
* This file is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* uncompressed. */
struct chunk_compressor *compressor;
- /* Buffer for dividing the read data into chunks of size
- * @out_chunk_size. */
- u8 *chunk_buf;
+ /* A buffer of size @out_chunk_size that has been loaned out from the
+ * chunk compressor and is currently being filled with the uncompressed
+ * data of the next chunk. */
+ u8 *cur_chunk_buf;
- /* Number of bytes in @chunk_buf that are currently filled. */
- size_t chunk_buf_filled;
+ /* Number of bytes in @cur_chunk_buf that are currently filled. */
+ size_t cur_chunk_buf_filled;
/* List of streams that currently have chunks being compressed. */
struct list_head pending_streams;
}
static int
-submit_chunk_for_compression(struct write_streams_ctx *ctx,
- const void *chunk, size_t size)
+prepare_chunk_buffer(struct write_streams_ctx *ctx)
{
- /* While we are unable to submit the chunk for compression (due to too
- * many chunks already outstanding), retrieve and write the next
- * compressed chunk. */
- while (!ctx->compressor->submit_chunk(ctx->compressor, chunk, size)) {
+ /* While we are unable to get a new chunk buffer due to too many chunks
+ * already outstanding, retrieve and write the next compressed chunk. */
+ while (!(ctx->cur_chunk_buf =
+ ctx->compressor->get_chunk_buffer(ctx->compressor)))
+ {
const void *cchunk;
u32 csize;
u32 usize;
bool bret;
int ret;
- bret = ctx->compressor->get_chunk(ctx->compressor,
- &cchunk, &csize, &usize);
-
+ bret = ctx->compressor->get_compression_result(ctx->compressor,
+ &cchunk,
+ &csize,
+ &usize);
wimlib_assert(bret);
ret = write_chunk(ctx, cchunk, csize, usize);
chunkptr = chunk;
chunkend = chunkptr + size;
do {
- const u8 *resized_chunk;
size_t needed_chunk_size;
+ size_t bytes_consumed;
+
+ if (!ctx->cur_chunk_buf) {
+ ret = prepare_chunk_buffer(ctx);
+ if (ret)
+ return ret;
+ }
if (ctx->write_resource_flags & WRITE_RESOURCE_FLAG_SOLID) {
needed_chunk_size = ctx->out_chunk_size;
} else {
- u64 res_bytes_remaining;
-
- res_bytes_remaining = ctx->cur_read_stream_size -
- ctx->cur_read_stream_offset;
needed_chunk_size = min(ctx->out_chunk_size,
- ctx->chunk_buf_filled +
- res_bytes_remaining);
+ ctx->cur_chunk_buf_filled +
+ (ctx->cur_read_stream_size -
+ ctx->cur_read_stream_offset));
}
- if (ctx->chunk_buf_filled == 0 &&
- chunkend - chunkptr >= needed_chunk_size)
- {
- /* No intermediate buffering needed. */
- resized_chunk = chunkptr;
- chunkptr += needed_chunk_size;
- ctx->cur_read_stream_offset += needed_chunk_size;
- } else {
- /* Intermediate buffering needed. */
- size_t bytes_consumed;
-
- bytes_consumed = min(chunkend - chunkptr,
- needed_chunk_size - ctx->chunk_buf_filled);
+ bytes_consumed = min(chunkend - chunkptr,
+ needed_chunk_size - ctx->cur_chunk_buf_filled);
- memcpy(&ctx->chunk_buf[ctx->chunk_buf_filled],
- chunkptr, bytes_consumed);
+ memcpy(&ctx->cur_chunk_buf[ctx->cur_chunk_buf_filled],
+ chunkptr, bytes_consumed);
- chunkptr += bytes_consumed;
- ctx->cur_read_stream_offset += bytes_consumed;
- ctx->chunk_buf_filled += bytes_consumed;
- if (ctx->chunk_buf_filled == needed_chunk_size) {
- resized_chunk = ctx->chunk_buf;
- ctx->chunk_buf_filled = 0;
- } else {
- break;
- }
+ chunkptr += bytes_consumed;
+ ctx->cur_read_stream_offset += bytes_consumed;
+ ctx->cur_chunk_buf_filled += bytes_consumed;
+ if (ctx->cur_chunk_buf_filled == needed_chunk_size) {
+ ctx->compressor->signal_chunk_filled(ctx->compressor,
+ ctx->cur_chunk_buf_filled);
+ ctx->cur_chunk_buf = NULL;
+ ctx->cur_chunk_buf_filled = 0;
}
-
- ret = submit_chunk_for_compression(ctx, resized_chunk,
- needed_chunk_size);
- if (ret)
- return ret;
-
} while (chunkptr != chunkend);
return 0;
}
if (ctx->compressor == NULL)
return 0;
- if (ctx->chunk_buf_filled != 0) {
- ret = submit_chunk_for_compression(ctx, ctx->chunk_buf,
- ctx->chunk_buf_filled);
- if (ret)
- return ret;
+ if (ctx->cur_chunk_buf_filled != 0) {
+ ctx->compressor->signal_chunk_filled(ctx->compressor,
+ ctx->cur_chunk_buf_filled);
}
- while (ctx->compressor->get_chunk(ctx->compressor, &cdata, &csize, &usize)) {
+ while (ctx->compressor->get_compression_result(ctx->compressor, &cdata,
+ &csize, &usize))
+ {
ret = write_chunk(ctx, cdata, csize, usize);
if (ret)
return ret;
WARNING("Failed to sort streams for solid compression. Continuing anyways.");
}
- if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE) {
- wimlib_assert(out_chunk_size != 0);
- if (out_chunk_size <= STACK_MAX) {
- ctx.chunk_buf = alloca(out_chunk_size);
- } else {
- ctx.chunk_buf = MALLOC(out_chunk_size);
- if (ctx.chunk_buf == NULL) {
- ret = WIMLIB_ERR_NOMEM;
- goto out_destroy_context;
- }
- }
- }
- ctx.chunk_buf_filled = 0;
-
ctx.progress_data.progfunc = progfunc;
ctx.progress_data.progctx = progctx;
&ctx.progress_data);
out_destroy_context:
- if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE && out_chunk_size > STACK_MAX)
- FREE(ctx.chunk_buf);
FREE(ctx.chunk_csizes);
if (ctx.compressor)
ctx.compressor->destroy(ctx.compressor);