chunk_tab->bytes_per_chunk_entry,
chunk_tab->table_disk_size);
} else {
- ret = full_pwrite(out_fd,
- chunk_tab->offsets +
- chunk_tab->bytes_per_chunk_entry,
- chunk_tab->table_disk_size,
- res_start_offset);
- }
- if (ret) {
- ERROR_WITH_ERRNO("Failed to write chunk table in compressed "
- "file resource");
+ ret = full_pwrite(out_fd,
+ chunk_tab->offsets +
+ chunk_tab->bytes_per_chunk_entry,
+ chunk_tab->table_disk_size,
+ res_start_offset);
}
+ if (ret)
+ ERROR_WITH_ERRNO("Write error");
return ret;
}
* One of the WIMLIB_COMPRESSION_TYPE_* constants to indicate which
* compression algorithm to use.
*
+ * @out_chunk_size:
+ * Compressed chunk size to use.
+ *
* @out_res_entry:
* On success, this is filled in with the offset, flags, compressed size,
* and uncompressed size of the resource in the output WIM.
* calculated (except when doing a raw copy --- see below). If the @unhashed
* flag is set on the lookup table entry, this message digest is simply copied
* to it; otherwise, the message digest is compared with the existing one, and
- * the function will fail if they do not match.
+ * this function will fail if they do not match.
*/
-int
+static int
write_wim_resource(struct wim_lookup_table_entry *lte,
struct filedes *out_fd, int out_ctype,
u32 out_chunk_size,
res_start_offset = out_fd->offset;
/* If we are not forcing the data to be recompressed, and the input
- * resource is located in a WIM with the same compression type as that
- * desired other than no compression, we can simply copy the compressed
- * data without recompressing it. This also means we must skip
- * calculating the SHA1, as we never will see the uncompressed data. */
+ * resource is located in a WIM with a compression mode compatible with
+ * the output, we can simply copy the compressed data without
+ * recompressing it. This also means we must skip calculating the SHA1,
+ * as we never will see the uncompressed data. */
if (can_raw_copy(lte, resource_flags, out_ctype, out_chunk_size)) {
- /* Normally we can request a RAW_FULL read, but if we're reading
- * from a pipable resource and writing a non-pipable resource or
- * vice versa, then a RAW_CHUNKS read needs to be requested so
- * that the written resource can be appropriately formatted.
- * However, in neither case is any actual decompression needed.
- */
+ /* Normally, for raw copies we can request a RAW_FULL read, but
+ * if we're reading from a pipable resource and writing a
+ * non-pipable resource or vice versa, then a RAW_CHUNKS read
+ * needs to be requested so that the written resource can be
+ * appropriately formatted. However, in neither case is any
+ * actual decompression needed. */
if (lte->is_pipable == !!(resource_flags &
WIMLIB_WRITE_RESOURCE_FLAG_PIPABLE))
+ {
resource_flags |= WIMLIB_READ_RESOURCE_FLAG_RAW_FULL;
- else
+ read_size = lte->resource_entry.size;
+ } else {
resource_flags |= WIMLIB_READ_RESOURCE_FLAG_RAW_CHUNKS;
- read_size = lte->resource_entry.size;
+ read_size = lte->resource_entry.original_size;
+ }
write_ctx.doing_sha = false;
} else {
write_ctx.doing_sha = true;
read_size = lte->resource_entry.original_size;
}
- /* If the output resource is to be compressed, initialize the chunk
- * table and set the function to use for chunk compression. Exceptions:
- * no compression function is needed if doing a raw copy; also, no chunk
- * table is needed if doing a *full* (not per-chunk) raw copy. */
+ /* Set the output compression mode and initialize chunk table if needed.
+ */
write_ctx.out_ctype = WIMLIB_COMPRESSION_TYPE_NONE;
write_ctx.out_chunk_size = out_chunk_size;
write_ctx.chunk_tab = NULL;
if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE) {
wimlib_assert(out_chunk_size > 0);
if (!(resource_flags & WIMLIB_READ_RESOURCE_FLAG_RAW)) {
+ /* Compression needed. */
write_ctx.out_ctype = out_ctype;
if (out_ctype == WIMLIB_COMPRESSION_TYPE_LZX) {
ret = wimlib_lzx_alloc_context(out_chunk_size,
write_ctx.comp_ctx = *comp_ctx;
}
if (!(resource_flags & WIMLIB_READ_RESOURCE_FLAG_RAW_FULL)) {
+ /* Chunk table needed. */
ret = begin_wim_resource_chunk_tab(lte, out_fd,
out_chunk_size,
&write_ctx.chunk_tab,
}
/* Write the entire resource by reading the entire resource and feeding
- * the data through the write_resource_cb function. */
+ * the data through write_resource_cb(). */
write_ctx.out_fd = out_fd;
write_ctx.resource_flags = resource_flags;
try_write_again:
write_ctx.doing_sha = false;
goto try_write_again;
}
- if (resource_flags & (WIMLIB_READ_RESOURCE_FLAG_RAW)) {
+ if (resource_flags & WIMLIB_READ_RESOURCE_FLAG_RAW) {
DEBUG("Copied raw compressed data "
"(%"PRIu64" => %"PRIu64" bytes @ +%"PRIu64", flags=0x%02x)",
out_res_entry->original_size, out_res_entry->size,
}
/* Like write_wim_resource(), but the resource is specified by a buffer of
- * uncompressed data rather a lookup table entry; also writes the SHA1 hash of
- * the buffer to @hash_ret. */
+ * uncompressed data rather a lookup table entry. Also writes the SHA1 message
+ * digest of the buffer to @hash_ret if it is non-NULL. */
int
write_wim_resource_from_buffer(const void *buf, size_t buf_size,
int reshdr_flags, struct filedes *out_fd,
u8 *hash_ret, int write_resource_flags,
struct wimlib_lzx_context **comp_ctx)
{
- /* Set up a temporary lookup table entry to provide to
- * write_wim_resource(). */
- struct wim_lookup_table_entry lte;
int ret;
+ struct wim_lookup_table_entry *lte;
+
+ /* Set up a temporary lookup table entry to provide to
+ * write_wim_resource(). */
- lte.resource_location = RESOURCE_IN_ATTACHED_BUFFER;
- lte.attached_buffer = (void*)buf;
- lte.resource_entry.original_size = buf_size;
- lte.resource_entry.flags = reshdr_flags;
- lte.compression_type = WIMLIB_COMPRESSION_TYPE_NONE;
+ lte = new_lookup_table_entry();
+ if (lte == NULL)
+ return WIMLIB_ERR_NOMEM;
+
+ lte->resource_location = RESOURCE_IN_ATTACHED_BUFFER;
+ lte->attached_buffer = (void*)buf;
+ lte->resource_entry.original_size = buf_size;
+ lte->resource_entry.flags = reshdr_flags;
if (write_resource_flags & WIMLIB_WRITE_RESOURCE_FLAG_PIPABLE) {
- sha1_buffer(buf, buf_size, lte.hash);
- lte.unhashed = 0;
+ sha1_buffer(buf, buf_size, lte->hash);
+ lte->unhashed = 0;
} else {
- lte.unhashed = 1;
+ lte->unhashed = 1;
}
- ret = write_wim_resource(<e, out_fd, out_ctype, out_chunk_size,
+ ret = write_wim_resource(lte, out_fd, out_ctype, out_chunk_size,
out_res_entry, write_resource_flags, comp_ctx);
if (ret)
- return ret;
+ goto out_free_lte;
if (hash_ret)
- copy_hash(hash_ret, lte.hash);
- return 0;
+ copy_hash(hash_ret, lte->hash);
+ ret = 0;
+out_free_lte:
+ lte->resource_location = RESOURCE_NONEXISTENT;
+ free_lookup_table_entry(lte);
+ return ret;
}
if (ret)
break;
/* In parallel mode, some streams are deferred for later,
- * serialized processing; ignore them here. */
+ * serialized processing; ignore them here. */
if (lte->deferred)
continue;
if (lte->unhashed) {
}
ret = full_writev(out_fd, vecs, nvecs);
if (ret)
- ERROR_WITH_ERRNO("Failed to write WIM chunks");
+ ERROR_WITH_ERRNO("Write error");
return ret;
}
struct message *msgs;
msgs = CALLOC(num_messages, sizeof(struct message));
- if (!msgs)
+ if (msgs == NULL)
return NULL;
for (size_t i = 0; i < num_messages; i++) {
if (init_message(&msgs[i], out_chunk_size)) {
/* Pre-allocate all the buffers that will be needed to do the chunk
* compression. */
ctx->msgs = allocate_messages(ctx->num_messages, ctx->out_chunk_size);
- if (!ctx->msgs)
+ if (ctx->msgs == NULL)
return WIMLIB_ERR_NOMEM;
/* Initially, all the messages are available to use. */
INIT_LIST_HEAD(&ctx->outstanding_streams);
ctx->num_outstanding_messages = 0;
+ /* Message currently being prepared. */
ctx->next_msg = NULL;
/* Resources that don't need any chunks compressed are added to this
- * list and written directly by the main thread. */
+ * list and written directly by the main thread. */
INIT_LIST_HEAD(&ctx->serial_streams);
+ /* Pointer to chunk table for stream currently being written. */
ctx->cur_chunk_tab = NULL;
return 0;
/* Get the next message from the queue and process it.
* The message will contain 1 or more data chunks that have been
- * compressed. */
+ * compressed. */
msg = shared_queue_get(ctx->compressed_res_queue);
msg->complete = true;
--ctx->num_outstanding_messages;
- /* Is this the next chunk in the current resource? If it's not
- * (i.e., an earlier chunk in a same or different resource
- * hasn't been compressed yet), do nothing, and keep this
- * message around until all earlier chunks are received.
+ /* Is this the next chunk in the current resource? If it's not (i.e.,
+ * an earlier chunk in a same or different resource hasn't been
+ * compressed yet), do nothing, and keep this message around until all
+ * earlier chunks are received.
*
- * Otherwise, write all the chunks we can. */
+ * Otherwise, write all the chunks we can. */
while (cur_lte != NULL &&
!list_empty(&cur_lte->msg_list)
&& (msg = container_of(cur_lte->msg_list.next,
ctx->write_resource_flags);
if (ret)
return ret;
-
}
- /* Write the compressed chunks from the message. */
+ /* Write the compressed chunks from the message. */
ret = write_wim_chunks(msg, ctx->out_fd, ctx->cur_chunk_tab,
ctx->write_resource_flags);
if (ret)
return ret;
- /* Was this the last chunk of the stream? If so, finish
- * it. */
+ /* Was this the last chunk of the stream? If so, finish the
+ * stream by writing the chunk table. */
if (list_empty(&cur_lte->msg_list) &&
msg->begin_chunk + msg->num_chunks == ctx->cur_chunk_tab->num_chunks)
{
* list for direct writing by the main thread (e.g.
* resources that don't need to be compressed because
* the desired compression type is the same as the
- * previous compression type). */
+ * previous compression type). */
if (!list_empty(&ctx->serial_streams)) {
ret = do_write_stream_list_serial(&ctx->serial_streams,
ctx->lookup_table,
return ret;
}
- /* Advance to the next stream to write. */
+ /* Advance to the next stream to write. */
if (list_empty(&ctx->outstanding_streams)) {
cur_lte = NULL;
} else {
return 0;
}
-/* Called when the main thread has read a new chunk of data. */
+/* Called when the main thread has read a new chunk of data. */
static int
main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx)
{
/* Read the entire stream @lte, feeding its data chunks to the
* compressor threads. Also SHA1-sum the stream; this is required in
* the case that @lte is unhashed, and a nice additional verification
- * when @lte is already hashed. */
+ * when @lte is already hashed. */
sha1_init(&ctx->next_sha_ctx);
ctx->next_chunk = 0;
ctx->next_num_chunks = DIV_ROUND_UP(wim_resource_size(lte),
/* Stream is too small or isn't being compressed. Process it by
* the main thread when we have a chance. We can't necessarily
* process it right here, as the main thread could be in the
- * middle of writing a different stream. */
+ * middle of writing a different stream. */
list_add_tail(<e->write_streams_list, &ctx->serial_streams);
lte->deferred = 1;
ret = 0;
* At any given point in time, multiple streams may be having chunks compressed
* concurrently. The stream that the main thread is currently *reading* may be
* later in the list that the stream that the main thread is currently
- * *writing*.
- */
+ * *writing*. */
static int
write_stream_list_parallel(struct list_head *stream_list,
struct wim_lookup_table *lookup_table,
struct shared_queue compressed_res_queue;
pthread_t *compressor_threads = NULL;
union wimlib_progress_info *progress = &progress_data->progress;
+ unsigned num_started_threads;
+ bool can_retry = true;
if (num_threads == 0) {
long nthreads = get_default_num_threads();
if (nthreads < 1 || nthreads > UINT_MAX) {
WARNING("Could not determine number of processors! Assuming 1");
- goto out_serial;
+ goto out_serial_quiet;
} else if (nthreads == 1) {
goto out_serial_quiet;
} else {
}
compressor_threads = MALLOC(num_threads * sizeof(pthread_t));
- if (!compressor_threads) {
+ if (compressor_threads == NULL) {
ret = WIMLIB_ERR_NOMEM;
goto out_free_params;
}
DEBUG("pthread_create thread %u of %u", i + 1, num_threads);
ret = pthread_create(&compressor_threads[i], NULL,
compressor_thread_proc, ¶ms[i]);
- if (ret != 0) {
+ if (ret) {
+ errno = ret;
ret = -1;
ERROR_WITH_ERRNO("Failed to create compressor "
"thread %u of %u",
i + 1, num_threads);
- num_threads = i;
+ num_started_threads = i;
goto out_join;
}
}
+ num_started_threads = num_threads;
if (progress_data->progress_func) {
progress_data->progress_func(WIMLIB_PROGRESS_MSG_WRITE_STREAMS,
}
struct main_writer_thread_ctx ctx;
+
+ memset(&ctx, 0, sizeof(ctx));
+
ctx.stream_list = stream_list;
ctx.lookup_table = lookup_table;
ctx.out_fd = out_fd;
ret = main_writer_thread_init_ctx(&ctx);
if (ret)
goto out_join;
+
+ can_retry = false;
ret = do_write_stream_list(stream_list, lookup_table,
main_thread_process_next_stream,
&ctx, progress_data);
* chunks to be compressed so that the remaining streams can actually be
* written to the output file. Furthermore, any remaining streams that
* had processing deferred to the main thread need to be handled. These
- * tasks are done by the main_writer_thread_finish() function. */
+ * tasks are done by the main_writer_thread_finish() function. */
ret = main_writer_thread_finish(&ctx);
out_destroy_ctx:
main_writer_thread_destroy_ctx(&ctx);
out_join:
- for (unsigned i = 0; i < num_threads; i++)
+ for (unsigned i = 0; i < num_started_threads; i++)
shared_queue_put(&res_to_compress_queue, NULL);
- for (unsigned i = 0; i < num_threads; i++) {
+ for (unsigned i = 0; i < num_started_threads; i++) {
if (pthread_join(compressor_threads[i], NULL)) {
WARNING_WITH_ERRNO("Failed to join compressor "
"thread %u of %u",
shared_queue_destroy(&compressed_res_queue);
out_destroy_res_to_compress_queue:
shared_queue_destroy(&res_to_compress_queue);
- if (ret >= 0 && ret != WIMLIB_ERR_NOMEM)
+ if (!can_retry || (ret >= 0 && ret != WIMLIB_ERR_NOMEM))
return ret;
out_serial:
WARNING("Falling back to single-threaded compression");
}
#endif
-/*
- * Write a list of streams to a WIM (@out_fd) using the compression type
- * @out_ctype and up to @num_threads compressor threads.
- */
+/* Write a list of streams to a WIM (@out_fd) using the compression type
+ * @out_ctype, chunk size @out_chunk_size, and up to @num_threads compressor
+ * threads. */
static int
write_stream_list(struct list_head *stream_list,
struct wim_lookup_table *lookup_table,
int write_flags,
unsigned num_threads, wimlib_progress_func_t progress_func)
{
- struct wim_lookup_table_entry *lte;
- size_t num_streams = 0;
- u64 total_bytes = 0;
- u64 total_compression_bytes = 0;
- struct write_streams_progress_data progress_data;
int ret;
int write_resource_flags;
- unsigned total_parts = 0;
- WIMStruct *prev_wim_part = NULL;
+ u64 total_bytes;
+ u64 total_compression_bytes;
+ unsigned total_parts;
+ WIMStruct *prev_wim_part;
+ size_t num_streams;
+ struct wim_lookup_table_entry *lte;
+ struct write_streams_progress_data progress_data;
if (list_empty(stream_list)) {
DEBUG("No streams to write.");
DEBUG("Writing stream list (offset = %"PRIu64", write_resource_flags=0x%08x)",
out_fd->offset, write_resource_flags);
- sort_stream_list_by_sequential_order(stream_list,
- offsetof(struct wim_lookup_table_entry,
- write_streams_list));
+ /* Sort the stream list into a good order for reading. */
+ ret = sort_stream_list_by_sequential_order(stream_list,
+ offsetof(struct wim_lookup_table_entry,
+ write_streams_list));
+ if (ret)
+ return ret;
/* Calculate the total size of the streams to be written. Note: this
* will be the uncompressed size, as we may not know the compressed size
* yet, and also this will assume that every unhashed stream will be
- * written (which will not necessarily be the case). */
+ * written (which will not necessarily be the case). */
+ total_bytes = 0;
+ total_compression_bytes = 0;
+ num_streams = 0;
+ total_parts = 0;
+ prev_wim_part = NULL;
list_for_each_entry(lte, stream_list, write_streams_list) {
num_streams++;
total_bytes += wim_resource_size(lte);
if (ret == 0)
DEBUG("Successfully wrote stream list.");
else
- DEBUG("Failed to write stream list.");
+ DEBUG("Failed to write stream list (ret=%d).", ret);
return ret;
}