+static int
+main_writer_thread_cb(const void *chunk, size_t chunk_size, void *_ctx)
+{
+ struct main_writer_thread_ctx *ctx = _ctx;
+ int ret;
+ struct message *next_msg;
+ u64 next_chunk_in_msg;
+
+ DEBUG2("chunk_size=%zu, wim_resource_size(next_lte)=%"PRIu64,
+ chunk_size, wim_resource_size(ctx->next_lte));
+
+ sha1_update(&ctx->next_sha_ctx, chunk, chunk_size);
+ next_msg = ctx->next_msg;
+ if (!next_msg) {
+ /* Start filling in a new message */
+
+ DEBUG2("Start new msg");
+
+ while (list_empty(&ctx->available_msgs)) {
+ /* No message available; receive messages, writing
+ * compressed data. */
+ DEBUG2("No msgs available!");
+ ret = receive_compressed_chunks(ctx);
+ if (ret)
+ return ret;
+ }
+
+ next_msg = container_of(ctx->available_msgs.next,
+ struct message, list);
+ list_del(&next_msg->list);
+ next_msg->complete = false;
+ next_msg->begin_chunk = ctx->next_chunk;
+ next_msg->num_chunks = min(MAX_CHUNKS_PER_MSG,
+ ctx->next_num_chunks - ctx->next_chunk);
+ DEBUG2("next_msg {begin_chunk=%"PRIu64", num_chunks=%"PRIu64"}",
+ next_msg->begin_chunk, next_msg->num_chunks);
+ ctx->next_msg = next_msg;
+ }
+
+ next_chunk_in_msg = ctx->next_chunk - next_msg->begin_chunk;
+
+ /* Fill in the next chunk to compress */
+ next_msg->uncompressed_chunk_sizes[next_chunk_in_msg] = chunk_size;
+ memcpy(next_msg->uncompressed_chunks[next_chunk_in_msg],
+ chunk, chunk_size);
+ ctx->next_chunk++;
+ if (++next_chunk_in_msg == next_msg->num_chunks) {
+ DEBUG2("Sending message %p", next_msg);
+ /* Send off an array of chunks to compress */
+ list_add_tail(&next_msg->list, &ctx->next_lte->msg_list);
+ shared_queue_put(ctx->res_to_compress_queue, next_msg);
+ ++ctx->num_outstanding_messages;
+ ctx->next_msg = NULL;