u64 num_chunks = (size + WIM_CHUNK_SIZE - 1) / WIM_CHUNK_SIZE;
size_t alloc_size = sizeof(struct chunk_table) + num_chunks * sizeof(u64);
struct chunk_table *chunk_tab = CALLOC(1, alloc_size);
- int ret;
if (!chunk_tab) {
ERROR("Failed to allocate chunk table for %"PRIu64" byte "
"resource", size);
- ret = WIMLIB_ERR_NOMEM;
- goto out;
+ return WIMLIB_ERR_NOMEM;
}
chunk_tab->file_offset = file_offset;
chunk_tab->num_chunks = num_chunks;
ERROR_WITH_ERRNO("Failed to write chunk table in compressed "
"file resource");
FREE(chunk_tab);
- ret = WIMLIB_ERR_WRITE;
- goto out;
+ return WIMLIB_ERR_WRITE;
}
-
- ret = 0;
*chunk_tab_ret = chunk_tab;
-out:
- return ret;
+ return 0;
}
/*
typedef unsigned (*compress_func_t)(const void *chunk, unsigned chunk_size,
void *out);
-compress_func_t
+static compress_func_t
get_compress_func(int out_ctype)
{
if (out_ctype == WIMLIB_COMPRESSION_TYPE_LZX)
wimlib_progress_func_t progress_func,
union wimlib_progress_info *progress)
{
- int ret;
+ int ret = 0;
struct wim_lookup_table_entry *lte;
/* For each stream in @stream_list ... */
struct wim_lookup_table_entry *tmp;
u32 orig_refcnt = lte->out_refcnt;
- ret = hash_unhashed_stream(lte,
- lookup_table,
- &tmp);
+ ret = hash_unhashed_stream(lte, lookup_table, &tmp);
if (ret)
- goto out;
+ break;
if (tmp != lte) {
lte = tmp;
/* We found a duplicate stream. */
* while in the latter case this is done because we do not have
* the SHA1 message digest yet. */
wimlib_assert(lte->out_refcnt != 0);
+ lte->deferred = 0;
ret = (*write_stream_cb)(lte, write_stream_ctx);
if (ret)
- goto out;
+ break;
+ /* In parallel mode, some streams are deferred for later,
+ * serialized processing; ignore them here. */
+ if (lte->deferred)
+ continue;
if (lte->unhashed) {
list_del(<e->unhashed_list);
lookup_table_insert(lookup_table, lte);
wim_resource_size(lte));
}
}
- ret = 0;
-out:
return ret;
}
struct list_head available_msgs;
struct list_head outstanding_streams;
struct list_head serial_streams;
+ size_t num_outstanding_messages;
SHA_CTX next_sha_ctx;
u64 next_chunk;
static void
main_writer_thread_destroy_ctx(struct main_writer_thread_ctx *ctx)
{
- size_t num_available_msgs;
- size_t num_outstanding_msgs;
- struct list_head *cur;
-
- num_available_msgs = 0;
- list_for_each(cur, &ctx->available_msgs)
- num_available_msgs++;
-
- num_outstanding_msgs = ctx->num_messages - num_available_msgs;
- while (num_outstanding_msgs--)
+ while (ctx->num_outstanding_messages--)
shared_queue_get(ctx->compressed_res_queue);
-
free_messages(ctx->msgs, ctx->num_messages);
FREE(ctx->cur_chunk_tab);
}
-
static int
main_writer_thread_init_ctx(struct main_writer_thread_ctx *ctx)
{
* currently being read and having chunks fed to the compressor threads.
* */
INIT_LIST_HEAD(&ctx->outstanding_streams);
+ ctx->num_outstanding_messages = 0;
+
+ ctx->next_msg = NULL;
/* Resources that don't need any chunks compressed are added to this
* list and written directly by the main thread. */
INIT_LIST_HEAD(&ctx->serial_streams);
+ ctx->cur_chunk_tab = NULL;
+
return 0;
}
int ret;
wimlib_assert(!list_empty(&ctx->outstanding_streams));
+ wimlib_assert(ctx->num_outstanding_messages != 0);
+
DEBUG2("Receiving more compressed chunks");
cur_lte = container_of(ctx->outstanding_streams.next,
struct wim_lookup_table_entry,
* compressed. */
msg = shared_queue_get(ctx->compressed_res_queue);
msg->complete = true;
+ --ctx->num_outstanding_messages;
DEBUG2("recved msg %p", msg);
* message around until all earlier chunks are received.
*
* Otherwise, write all the chunks we can. */
- while (!list_empty(&cur_lte->msg_list)
+ while (cur_lte != NULL &&
+ !list_empty(&cur_lte->msg_list)
&& (msg = container_of(cur_lte->msg_list.next,
struct message,
list))->complete)
* resources that don't need to be compressed because
* the desired compression type is the same as the
* previous compression type). */
-#if 0
ret = do_write_stream_list_serial(&ctx->serial_streams,
ctx->lookup_table,
ctx->out_fp,
ctx->write_resource_flags,
ctx->progress_func,
ctx->progress);
-#endif
if (ret)
return ret;
- if (list_empty(&ctx->outstanding_streams))
- return 0;
- cur_lte = container_of(ctx->outstanding_streams.next,
- struct wim_lookup_table_entry,
- being_compressed_list);
- #ifdef ENABLE_MORE_DEBUG
- DEBUG2("Advance to stream:");
- print_lookup_table_entry(cur_lte, stderr);
- #endif
+ if (list_empty(&ctx->outstanding_streams)) {
+ cur_lte = NULL;
+ } else {
+ cur_lte = container_of(ctx->outstanding_streams.next,
+ struct wim_lookup_table_entry,
+ being_compressed_list);
+ #ifdef ENABLE_MORE_DEBUG
+ DEBUG2("Advance to stream:");
+ print_lookup_table_entry(cur_lte, stderr);
+ #endif
+ }
}
}
return 0;
struct main_writer_thread_ctx *ctx = _ctx;
int ret;
struct message *next_msg;
+ u64 next_chunk_in_msg;
DEBUG2("chunk_size=%zu, wim_resource_size(next_lte)=%"PRIu64,
chunk_size, wim_resource_size(ctx->next_lte));
ctx->next_msg = next_msg;
}
- u64 next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk;
+ next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk;
/* Fill in the next chunk to compress */
next_msg->uncompressed_chunk_sizes[next_chunk_in_msg] = chunk_size;
/* Send off an array of chunks to compress */
list_add_tail(&next_msg->list, &ctx->next_lte->msg_list);
shared_queue_put(ctx->res_to_compress_queue, next_msg);
+ ++ctx->num_outstanding_messages;
ctx->next_msg = NULL;
}
return 0;
main_writer_thread_finish(void *_ctx)
{
struct main_writer_thread_ctx *ctx = _ctx;
- int ret = 0;
+ int ret;
DEBUG2("finishing");
- while (!list_empty(&ctx->outstanding_streams)) {
+ while (ctx->num_outstanding_messages != 0) {
ret = receive_compressed_chunks(ctx);
if (ret)
- break;
+ return ret;
}
+ wimlib_assert(list_empty(&ctx->outstanding_streams));
return do_write_stream_list_serial(&ctx->serial_streams,
ctx->lookup_table,
ctx->out_fp,
if (wim_resource_size(lte) < 1000 ||
ctx->out_ctype == WIMLIB_COMPRESSION_TYPE_NONE ||
(lte->resource_location == RESOURCE_IN_WIM &&
+ !(ctx->write_resource_flags & WIMLIB_RESOURCE_FLAG_RECOMPRESS) &&
wimlib_get_compression_type(lte->wim) == ctx->out_ctype))
{
- list_add_tail(<e->write_streams_list,
- &ctx->serial_streams);
+ list_add_tail(<e->write_streams_list, &ctx->serial_streams);
+ lte->deferred = 1;
ret = 0;
} else {
ret = submit_stream_for_compression(lte, ctx);
FILE *out_fp,
int out_ctype,
int write_resource_flags,
- unsigned num_threads,
wimlib_progress_func_t progress_func,
- union wimlib_progress_info *progress)
+ union wimlib_progress_info *progress,
+ unsigned num_threads)
{
int ret;
struct shared_queue res_to_compress_queue;
progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS, progress);
struct main_writer_thread_ctx ctx;
- memset(&ctx, 0, sizeof(ctx));
ctx.stream_list = stream_list;
ctx.lookup_table = lookup_table;
ctx.out_fp = out_fp;
ctx.res_to_compress_queue = &res_to_compress_queue;
ctx.compressed_res_queue = &compressed_res_queue;
ctx.num_messages = queue_size;
- ctx.write_resource_flags = write_resource_flags;
+ ctx.write_resource_flags = write_resource_flags | WIMLIB_RESOURCE_FLAG_THREADSAFE_READ;
ctx.progress_func = progress_func;
ctx.progress = progress;
ret = main_writer_thread_init_ctx(&ctx);
if (ret)
- goto out;
- ret = do_write_stream_list(stream_list,
- lookup_table,
+ goto out_join;
+ ret = do_write_stream_list(stream_list, lookup_table,
main_thread_process_next_stream,
- &ctx,
- NULL,
- NULL);
+ &ctx, NULL, NULL);
if (ret)
goto out_destroy_ctx;
ret = main_writer_thread_finish(&ctx);
shared_queue_destroy(&compressed_res_queue);
out_destroy_res_to_compress_queue:
shared_queue_destroy(&res_to_compress_queue);
-out:
if (ret >= 0 && ret != WIMLIB_ERR_NOMEM)
return ret;
out_serial:
static int
write_stream_list(struct list_head *stream_list,
struct wim_lookup_table *lookup_table,
- FILE *out_fp, int out_ctype, int write_resource_flags,
+ FILE *out_fp, int out_ctype, int write_flags,
unsigned num_threads, wimlib_progress_func_t progress_func)
{
struct wim_lookup_table_entry *lte;
u64 total_compression_bytes = 0;
union wimlib_progress_info progress;
int ret;
+ int write_resource_flags;
if (list_empty(stream_list))
return 0;
+ write_resource_flags = write_flags_to_resource_flags(write_flags);
+
/* Calculate the total size of the streams to be written. Note: this
* will be the uncompressed size, as we may not know the compressed size
* yet, and also this will assume that every unhashed stream will be
out_fp,
out_ctype,
write_resource_flags,
- num_threads,
progress_func,
- &progress);
+ &progress,
+ num_threads);
else
#endif
ret = write_stream_list_serial(stream_list,
wim->lookup_table,
wim->out_fp,
wimlib_get_compression_type(wim),
- write_flags_to_resource_flags(write_flags),
+ write_flags,
num_threads,
progress_func);
}