main_writer_thread_proc(): Add more comments
authorEric Biggers <ebiggers3@gmail.com>
Sun, 18 Nov 2012 08:13:10 +0000 (02:13 -0600)
committerEric Biggers <ebiggers3@gmail.com>
Sun, 18 Nov 2012 08:13:10 +0000 (02:13 -0600)
src/write.c

index 6663564585c299e9dcec970d42d3dc4fd5c6db8d..56fbab0ea1c08e5b33c7671e01f768c257821962 100644 (file)
@@ -498,7 +498,7 @@ finish_wim_resource_chunk_tab(struct chunk_table *chunk_tab,
 }
 
 static int prepare_resource_for_read(struct lookup_table_entry *lte
-               
+
                                        #ifdef WITH_NTFS_3G
                                        , ntfs_inode **ni_ret
                                        #endif
@@ -940,6 +940,18 @@ static int write_wim_chunks(struct message *msg, FILE *out_fp,
        return 0;
 }
 
+/*
+ * This function is executed by the main thread when the resources are being
+ * compressed in parallel.  The main thread is in change of all reading of the
+ * uncompressed data and writing of the compressed data.  The compressor threads
+ * *only* do compression from/to in-memory buffers.
+ *
+ * Each unit of work given to a compressor thread is up to MAX_CHUNKS_PER_MSG
+ * chunks of compressed data to compress, represented in a `struct message'.
+ * Each message is passed from the main thread to a worker thread through the
+ * res_to_compress_queue, and it is passed back through the
+ * compressed_res_queue.
+ */
 static int main_writer_thread_proc(struct list_head *stream_list,
                                   FILE *out_fp,
                                   int out_ctype,
@@ -953,12 +965,27 @@ static int main_writer_thread_proc(struct list_head *stream_list,
        struct message msgs[queue_size];
        ZERO_ARRAY(msgs);
 
+       // Initially, all the messages are available to use.
        LIST_HEAD(available_msgs);
-
        for (size_t i = 0; i < ARRAY_LEN(msgs); i++)
                list_add(&msgs[i].list, &available_msgs);
 
-
+       // outstanding_resources is the list of resources that currently have
+       // had chunks sent off for compression.
+       //
+       // The first stream in outstanding_resources is the stream that is
+       // currently being written (cur_lte).
+       //
+       // The last stream in outstanding_resources is the stream that is
+       // currently being read and chunks fed to the compressor threads
+       // (next_lte).
+       //
+       // Depending on the number of threads and the sizes of the resource,
+       // the outstanding streams list may contain streams between cur_lte and
+       // next_lte that have all their chunks compressed or being compressed,
+       // but haven't been written yet.
+       //
+       LIST_HEAD(outstanding_resources);
        struct list_head *next_resource = stream_list->next;
        struct lookup_table_entry *next_lte = container_of(next_resource,
                                                           struct lookup_table_entry,
@@ -967,19 +994,17 @@ static int main_writer_thread_proc(struct list_head *stream_list,
        u64 next_chunk = 0;
        u64 next_num_chunks = wim_resource_chunks(next_lte);
        INIT_LIST_HEAD(&next_lte->msg_list);
+       list_add_tail(&next_lte->staging_list, &outstanding_resources);
+
+       // As in write_wim_resource(), each resource we read is checksummed.
        SHA_CTX next_sha_ctx;
        sha1_init(&next_sha_ctx);
-
        u8 next_hash[SHA1_HASH_SIZE];
 
-       // Resources owning chunks that have been sent off for compression
-       LIST_HEAD(outstanding_resources);
-
-       // Resources that are going to be written by the main thread
+       // Resources that don't need any chunks compressed are added to this
+       // list and written directly by the main thread.
        LIST_HEAD(my_resources);
 
-       list_add_tail(&next_lte->staging_list, &outstanding_resources);
-
        struct lookup_table_entry *cur_lte = next_lte;
        struct chunk_table *cur_chunk_tab = NULL;
        struct lookup_table_entry *lte;
@@ -999,6 +1024,8 @@ static int main_writer_thread_proc(struct list_head *stream_list,
              "and compressed data (%zu bytes needed)",
              queue_size * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2);
 
+       // Pre-allocate all the buffers that will be needed to do the chunk
+       // compression.
        for (size_t i = 0; i < ARRAY_LEN(msgs); i++) {
                for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) {
                        msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE);
@@ -1021,8 +1048,6 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                // compressed.
                while (!list_empty(&available_msgs) && next_lte != NULL) {
 
-                       wimlib_assert(next_chunk < next_num_chunks);
-
                        // Get a message from the available messages
                        // list
                        msg = container_of(available_msgs.next,
@@ -1043,12 +1068,18 @@ static int main_writer_thread_proc(struct list_head *stream_list,
 
                        unsigned size = WIM_CHUNK_SIZE;
                        for (unsigned i = 0; i < msg->num_chunks; i++) {
+
+                               // Read chunk @next_chunk of the stream into the
+                               // message so that a compressor thread can
+                               // compress it.
+
                                if (next_chunk == next_num_chunks - 1 &&
                                     wim_resource_size(next_lte) % WIM_CHUNK_SIZE != 0)
                                {
                                        size = wim_resource_size(next_lte) % WIM_CHUNK_SIZE;
                                }
 
+
                                DEBUG2("Read resource (size=%u, offset=%zu)",
                                      size, next_chunk * WIM_CHUNK_SIZE);
 
@@ -1092,13 +1123,11 @@ static int main_writer_thread_proc(struct list_head *stream_list,
 
                        // Advance to the next resource.
                        //
-                       // If the next resource needs no compression,
-                       // just write it with this thread (not now
-                       // though--- we could be in the middle of
-                       // writing another resource.)  Keep doing this
-                       // until we either get to the end of the
-                       // resources list, or we get to a resource that
-                       // needs compression.
+                       // If the next resource needs no compression, just write
+                       // it with this thread (not now though--- we could be in
+                       // the middle of writing another resource.)  Keep doing
+                       // this until we either get to the end of the resources
+                       // list, or we get to a resource that needs compression.
 
                        while (1) {
                                if (next_resource == stream_list) {
@@ -1143,7 +1172,7 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                }
 
                // If there are no outstanding resources, there are no more
-               // resources to compress.
+               // resources that need to be written.
                if (list_empty(&outstanding_resources)) {
                        DEBUG("No outstanding resources! Done");
                        ret = 0;
@@ -1177,16 +1206,13 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                        DEBUG2("Complete msg (begin_chunk=%"PRIu64")", msg->begin_chunk);
                        if (msg->begin_chunk == 0) {
                                DEBUG2("Begin chunk tab");
-                               // This is the first set of chunks.
-                               // Leave space for the chunk table in
-                               // the output file.
+                               // This is the first set of chunks.  Leave space
+                               // for the chunk table in the output file.
                                off_t cur_offset = ftello(out_fp);
                                if (cur_offset == -1) {
                                        ret = WIMLIB_ERR_WRITE;
                                        goto out;
                                }
-                               FREE(cur_chunk_tab);
-                               cur_chunk_tab = NULL;
                                ret = begin_wim_resource_chunk_tab(cur_lte,
                                                                   out_fp,
                                                                   cur_offset,
@@ -1194,13 +1220,20 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                if (ret != 0)
                                        goto out;
                        }
+
+                       // Write the compressed chunks from the message.
                        ret = write_wim_chunks(msg, out_fp, cur_chunk_tab);
                        if (ret != 0)
                                goto out;
 
                        list_del(&msg->list);
+
+                       // This message is available to use for different chunks
+                       // now.
                        list_add(&msg->list, &available_msgs);
 
+                       // Was this the last chunk of the stream?  If so,
+                       // finish it.
                        if (list_empty(&cur_lte->msg_list) &&
                            msg->begin_chunk + msg->num_chunks == cur_chunk_tab->num_chunks)
                        {
@@ -1210,7 +1243,8 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                                                    out_fp,
                                                                    &res_csize);
                                if (ret != 0)
-                                       return ret;
+                                       goto out;
+
 
                                cur_lte->output_resource_entry.size =
                                        res_csize;
@@ -1225,6 +1259,9 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                        cur_lte->resource_entry.flags |
                                                WIM_RESHDR_FLAG_COMPRESSED;
 
+                               FREE(cur_chunk_tab);
+                               cur_chunk_tab = NULL;
+
                                struct list_head *next = cur_lte->staging_list.next;
                                list_del(&cur_lte->staging_list);
 
@@ -1238,6 +1275,13 @@ static int main_writer_thread_proc(struct list_head *stream_list,
                                                               staging_list);
                                }
 
+                               // Since we just finished writing a stream,
+                               // write any streams that have been added to the
+                               // my_resources list for direct writing by the
+                               // main thread (e.g. resources that don't need
+                               // to be compressed because the desired
+                               // compression type is the same as the previous
+                               // compression type).
                                struct lookup_table_entry *tmp;
                                list_for_each_entry_safe(lte,
                                                         tmp,
@@ -1281,7 +1325,7 @@ out:
                }
 
                while (num_available_msgs < ARRAY_LEN(msgs)) {
-                       msg = shared_queue_get(compressed_res_queue);
+                       shared_queue_get(compressed_res_queue);
                        num_available_msgs++;
                }
        }
@@ -1333,7 +1377,6 @@ static int write_stream_list_parallel(struct list_head *stream_list,
                if (ret != 0)
                        goto out_destroy_res_to_compress_queue;
 
-
                struct compressor_thread_params params;
                params.res_to_compress_queue = &res_to_compress_queue;
                params.compressed_res_queue = &compressed_res_queue;
@@ -1412,7 +1455,7 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp,
                       wimlib_get_compression_type_string(out_ctype));
        }
 
-       if (compression_needed && total_size >= 0) { // XXX
+       if (compression_needed && total_size >= 1000000) {
                return write_stream_list_parallel(stream_list, out_fp,
                                                  out_ctype, write_flags);
        } else {