X-Git-Url: https://wimlib.net/git/?a=blobdiff_plain;f=src%2Fwrite.c;h=4d3ade0893b02da6eec64a460a693540c3b09f84;hb=a523981c40a013fc5d5ca382e4cacb46667934e3;hp=b143f16d7dc4de001a0e3e2b2cd01b248ffcd593;hpb=9c4d58824c53710f891cfab234bf0bc697030ebe;p=wimlib diff --git a/src/write.c b/src/write.c index b143f16d..4d3ade08 100644 --- a/src/write.c +++ b/src/write.c @@ -25,6 +25,15 @@ * along with wimlib; if not, see http://www.gnu.org/licenses/. */ +#include "config.h" + +#if defined(HAVE_SYS_FILE_H) && defined(HAVE_FLOCK) +/* On BSD, this should be included before "list.h" so that "list.h" can + * overwrite the LIST_HEAD macro. */ +#include +#endif + +#include "list.h" #include "wimlib_internal.h" #include "io.h" #include "dentry.h" @@ -32,13 +41,12 @@ #include "xml.h" #include "lzx.h" #include "xpress.h" -#include #ifdef ENABLE_MULTITHREADED_COMPRESSION -#include #include #endif +#include #include #ifdef WITH_NTFS_3G @@ -48,30 +56,21 @@ #include #endif - #ifdef HAVE_ALLOCA_H #include #else #include #endif -static int do_fflush(FILE *fp) +static int fflush_and_ftruncate(FILE *fp, off_t size) { - int ret = fflush(fp); + int ret; + + ret = fflush(fp); if (ret != 0) { ERROR_WITH_ERRNO("Failed to flush data to output WIM file"); return WIMLIB_ERR_WRITE; } - return 0; -} - -static int fflush_and_ftruncate(FILE *fp, off_t size) -{ - int ret; - - ret = do_fflush(fp); - if (ret != 0) - return ret; ret = ftruncate(fileno(fp), size); if (ret != 0) { ERROR_WITH_ERRNO("Failed to truncate output WIM file to " @@ -210,7 +209,7 @@ static int write_wim_resource_chunk(const u8 chunk[], unsigned chunk_size, } /* - * Finishes a WIM chunk tale and writes it to the output file at the correct + * Finishes a WIM chunk table and writes it to the output file at the correct * offset. * * The final size of the full compressed resource is returned in the @@ -324,6 +323,27 @@ static void end_wim_resource_read(struct lookup_table_entry *lte #endif } +static int +write_uncompressed_resource_and_truncate(struct lookup_table_entry *lte, + FILE *out_fp, + off_t file_offset, + struct resource_entry *out_res_entry) +{ + int ret; + if (fseeko(out_fp, file_offset, SEEK_SET) != 0) { + ERROR_WITH_ERRNO("Failed to seek to byte %"PRIu64" of " + "output WIM file", file_offset); + return WIMLIB_ERR_WRITE; + } + ret = write_wim_resource(lte, out_fp, WIMLIB_COMPRESSION_TYPE_NONE, + out_res_entry, 0); + if (ret != 0) + return ret; + + return fflush_and_ftruncate(out_fp, + file_offset + wim_resource_size(lte)); +} + /* * Writes a WIM resource to a FILE * opened for writing. The resource may be * written uncompressed or compressed depending on the @out_ctype parameter. @@ -499,18 +519,10 @@ int write_wim_resource(struct lookup_table_entry *lte, { /* Oops! We compressed the resource to larger than the original * size. Write the resource uncompressed instead. */ - if (fseeko(out_fp, file_offset, SEEK_SET) != 0) { - ERROR_WITH_ERRNO("Failed to seek to byte %"PRIu64" " - "of output WIM file", file_offset); - ret = WIMLIB_ERR_WRITE; - goto out_fclose; - } - ret = write_wim_resource(lte, out_fp, WIMLIB_COMPRESSION_TYPE_NONE, - out_res_entry, flags); - if (ret != 0) - goto out_fclose; - - ret = fflush_and_ftruncate(out_fp, file_offset + out_res_entry->size); + ret = write_uncompressed_resource_and_truncate(lte, + out_fp, + file_offset, + out_res_entry); if (ret != 0) goto out_fclose; } else { @@ -538,13 +550,14 @@ out: #ifdef ENABLE_MULTITHREADED_COMPRESSION struct shared_queue { - sem_t filled_slots; - sem_t empty_slots; - pthread_mutex_t lock; + unsigned size; unsigned front; unsigned back; + unsigned filled_slots; void **array; - unsigned size; + pthread_mutex_t lock; + pthread_cond_t msg_avail_cond; + pthread_cond_t space_avail_cond; }; static int shared_queue_init(struct shared_queue *q, unsigned size) @@ -552,46 +565,52 @@ static int shared_queue_init(struct shared_queue *q, unsigned size) q->array = CALLOC(sizeof(q->array[0]), size); if (!q->array) return WIMLIB_ERR_NOMEM; - - sem_init(&q->filled_slots, 0, 0); - sem_init(&q->empty_slots, 0, size); - pthread_mutex_init(&q->lock, NULL); + q->filled_slots = 0; q->front = 0; q->back = size - 1; q->size = size; + pthread_mutex_init(&q->lock, NULL); + pthread_cond_init(&q->msg_avail_cond, NULL); + pthread_cond_init(&q->space_avail_cond, NULL); return 0; } static void shared_queue_destroy(struct shared_queue *q) { - sem_destroy(&q->filled_slots); - sem_destroy(&q->empty_slots); - pthread_mutex_destroy(&q->lock); FREE(q->array); + pthread_mutex_destroy(&q->lock); + pthread_cond_destroy(&q->msg_avail_cond); + pthread_cond_destroy(&q->space_avail_cond); } static void shared_queue_put(struct shared_queue *q, void *obj) { - sem_wait(&q->empty_slots); pthread_mutex_lock(&q->lock); + while (q->filled_slots == q->size) + pthread_cond_wait(&q->space_avail_cond, &q->lock); q->back = (q->back + 1) % q->size; q->array[q->back] = obj; + q->filled_slots++; - sem_post(&q->filled_slots); + pthread_cond_broadcast(&q->msg_avail_cond); pthread_mutex_unlock(&q->lock); } static void *shared_queue_get(struct shared_queue *q) { - sem_wait(&q->filled_slots); + void *obj; + pthread_mutex_lock(&q->lock); + while (q->filled_slots == 0) + pthread_cond_wait(&q->msg_avail_cond, &q->lock); - void *obj = q->array[q->front]; + obj = q->array[q->front]; q->array[q->front] = NULL; q->front = (q->front + 1) % q->size; + q->filled_slots--; - sem_post(&q->empty_slots); + pthread_cond_broadcast(&q->space_avail_cond); pthread_mutex_unlock(&q->lock); return obj; } @@ -775,25 +794,19 @@ static int main_writer_thread_proc(struct list_head *stream_list, // LIST_HEAD(outstanding_resources); struct list_head *next_resource = stream_list->next; - struct lookup_table_entry *next_lte = container_of(next_resource, - struct lookup_table_entry, - staging_list); - next_resource = next_resource->next; + struct lookup_table_entry *next_lte = NULL; u64 next_chunk = 0; - u64 next_num_chunks = wim_resource_chunks(next_lte); - INIT_LIST_HEAD(&next_lte->msg_list); - list_add_tail(&next_lte->staging_list, &outstanding_resources); + u64 next_num_chunks = 0; // As in write_wim_resource(), each resource we read is checksummed. SHA_CTX next_sha_ctx; - sha1_init(&next_sha_ctx); u8 next_hash[SHA1_HASH_SIZE]; // Resources that don't need any chunks compressed are added to this // list and written directly by the main thread. LIST_HEAD(my_resources); - struct lookup_table_entry *cur_lte = next_lte; + struct lookup_table_entry *cur_lte = NULL; struct chunk_table *cur_chunk_tab = NULL; struct message *msg; @@ -801,14 +814,6 @@ static int main_writer_thread_proc(struct list_head *stream_list, ntfs_inode *ni = NULL; #endif -#ifdef WITH_NTFS_3G - ret = prepare_resource_for_read(next_lte, &ni); -#else - ret = prepare_resource_for_read(next_lte); -#endif - if (ret != 0) - goto out; - DEBUG("Initializing buffers for uncompressed " "and compressed data (%zu bytes needed)", queue_size * MAX_CHUNKS_PER_MSG * WIM_CHUNK_SIZE * 2); @@ -818,7 +823,12 @@ static int main_writer_thread_proc(struct list_head *stream_list, for (size_t i = 0; i < ARRAY_LEN(msgs); i++) { for (size_t j = 0; j < MAX_CHUNKS_PER_MSG; j++) { msgs[i].compressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE); - msgs[i].uncompressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE); + + // The extra 8 bytes is because longest_match() in lz.c + // may read a little bit off the end of the uncompressed + // data. It doesn't need to be initialized--- we really + // just need to avoid accessing an unmapped page. + msgs[i].uncompressed_chunks[j] = MALLOC(WIM_CHUNK_SIZE + 8); if (msgs[i].compressed_chunks[j] == NULL || msgs[i].uncompressed_chunks[j] == NULL) { @@ -838,7 +848,88 @@ static int main_writer_thread_proc(struct list_head *stream_list, // are no more messages available since they were all sent off, // or (b) there are no more resources that need to be // compressed. - while (!list_empty(&available_msgs) && next_lte != NULL) { + while (!list_empty(&available_msgs)) { + if (next_chunk == next_num_chunks) { + // If next_chunk == next_num_chunks, there are + // no more chunks to write in the current + // stream. So, check the SHA1 message digest of + // the stream that was just finished (unless + // next_lte == NULL, which is the case the very + // first time this loop is entered, and also + // near the very end of the compression when + // there are no more streams.) Then, advance to + // the next stream (if there is one). + if (next_lte != NULL) { + #ifdef WITH_NTFS_3G + end_wim_resource_read(next_lte, ni); + ni = NULL; + #else + end_wim_resource_read(next_lte); + #endif + DEBUG2("Finalize SHA1 md (next_num_chunks=%zu)", + next_num_chunks); + sha1_final(next_hash, &next_sha_ctx); + if (!hashes_equal(next_lte->hash, next_hash)) { + ERROR("WIM resource has incorrect hash!"); + if (next_lte->resource_location == + RESOURCE_IN_FILE_ON_DISK) + { + ERROR("We were reading it from `%s'; " + "maybe it changed while we were " + "reading it.", + next_lte->file_on_disk); + } + ret = WIMLIB_ERR_INVALID_RESOURCE_HASH; + goto out; + } + } + + // Advance to the next resource. + // + // If the next resource needs no compression, just write + // it with this thread (not now though--- we could be in + // the middle of writing another resource.) Keep doing + // this until we either get to the end of the resources + // list, or we get to a resource that needs compression. + while (1) { + if (next_resource == stream_list) { + next_lte = NULL; + break; + } + next_lte = container_of(next_resource, + struct lookup_table_entry, + staging_list); + next_resource = next_resource->next; + if ((!(write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS) + && wim_resource_compression_type(next_lte) == out_ctype) + || wim_resource_size(next_lte) == 0) + { + list_add_tail(&next_lte->staging_list, + &my_resources); + } else { + list_add_tail(&next_lte->staging_list, + &outstanding_resources); + next_chunk = 0; + next_num_chunks = wim_resource_chunks(next_lte); + sha1_init(&next_sha_ctx); + INIT_LIST_HEAD(&next_lte->msg_list); + #ifdef WITH_NTFS_3G + ret = prepare_resource_for_read(next_lte, &ni); + #else + ret = prepare_resource_for_read(next_lte); + #endif + + if (ret != 0) + goto out; + if (cur_lte == NULL) + cur_lte = next_lte; + break; + } + } + } + + if (next_lte == NULL) + break; // Get a message from the available messages // list @@ -893,81 +984,11 @@ static int main_writer_thread_proc(struct list_head *stream_list, list_add_tail(&msg->list, &next_lte->msg_list); shared_queue_put(res_to_compress_queue, msg); DEBUG2("Compression request sent"); - - if (next_chunk != next_num_chunks) - // More chunks to send for this resource - continue; - - // Done sending compression requests for a resource! - // Check the SHA1 message digest. - DEBUG2("Finalize SHA1 md (next_num_chunks=%zu)", next_num_chunks); - sha1_final(next_hash, &next_sha_ctx); - if (!hashes_equal(next_lte->hash, next_hash)) { - ERROR("WIM resource has incorrect hash!"); - if (next_lte->resource_location == RESOURCE_IN_FILE_ON_DISK) { - ERROR("We were reading it from `%s'; maybe it changed " - "while we were reading it.", - next_lte->file_on_disk); - } - ret = WIMLIB_ERR_INVALID_RESOURCE_HASH; - goto out; - } - - // Advance to the next resource. - // - // If the next resource needs no compression, just write - // it with this thread (not now though--- we could be in - // the middle of writing another resource.) Keep doing - // this until we either get to the end of the resources - // list, or we get to a resource that needs compression. - - while (1) { - if (next_resource == stream_list) { - next_lte = NULL; - break; - } - #ifdef WITH_NTFS_3G - end_wim_resource_read(next_lte, ni); - ni = NULL; - #else - end_wim_resource_read(next_lte); - #endif - - next_lte = container_of(next_resource, - struct lookup_table_entry, - staging_list); - next_resource = next_resource->next; - if ((!(write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS) - && next_lte->resource_location == RESOURCE_IN_WIM - && wimlib_get_compression_type(next_lte->wim) == out_ctype) - || wim_resource_size(next_lte) == 0) - { - list_add_tail(&next_lte->staging_list, - &my_resources); - } else { - list_add_tail(&next_lte->staging_list, - &outstanding_resources); - next_chunk = 0; - next_num_chunks = wim_resource_chunks(next_lte); - sha1_init(&next_sha_ctx); - INIT_LIST_HEAD(&next_lte->msg_list); - #ifdef WITH_NTFS_3G - ret = prepare_resource_for_read(next_lte, &ni); - #else - ret = prepare_resource_for_read(next_lte); - #endif - if (ret != 0) - goto out; - DEBUG2("Updated next_lte"); - break; - } - } } // If there are no outstanding resources, there are no more // resources that need to be written. if (list_empty(&outstanding_resources)) { - DEBUG("No outstanding resources! Done"); ret = 0; goto out; } @@ -975,23 +996,16 @@ static int main_writer_thread_proc(struct list_head *stream_list, // Get the next message from the queue and process it. // The message will contain 1 or more data chunks that have been // compressed. - DEBUG2("Waiting for message"); msg = shared_queue_get(compressed_res_queue); msg->complete = true; - DEBUG2("Received msg (begin_chunk=%"PRIu64")", msg->begin_chunk); - - list_for_each_entry(msg, &cur_lte->msg_list, list) { - DEBUG2("complete=%d", msg->complete); - } - // Is this the next chunk in the current resource? If it's not // (i.e., an earlier chunk in a same or different resource // hasn't been compressed yet), do nothing, and keep this // message around until all earlier chunks are received. // // Otherwise, write all the chunks we can. - while (!list_empty(&cur_lte->msg_list) + while (cur_lte != NULL && !list_empty(&cur_lte->msg_list) && (msg = container_of(cur_lte->msg_list.next, struct message, list))->complete) @@ -1039,6 +1053,32 @@ static int main_writer_thread_proc(struct list_head *stream_list, if (ret != 0) goto out; + if (res_csize >= wim_resource_size(cur_lte)) { + /* Oops! We compressed the resource to + * larger than the original size. Write + * the resource uncompressed instead. */ + ret = write_uncompressed_resource_and_truncate( + cur_lte, + out_fp, + cur_chunk_tab->file_offset, + &cur_lte->output_resource_entry); + if (ret != 0) + goto out; + } else { + cur_lte->output_resource_entry.size = + res_csize; + + cur_lte->output_resource_entry.original_size = + cur_lte->resource_entry.original_size; + + cur_lte->output_resource_entry.offset = + cur_chunk_tab->file_offset; + + cur_lte->output_resource_entry.flags = + cur_lte->resource_entry.flags | + WIM_RESHDR_FLAG_COMPRESSED; + } + progress->write_streams.completed_bytes += wim_resource_size(cur_lte); progress->write_streams.completed_streams++; @@ -1048,19 +1088,6 @@ static int main_writer_thread_proc(struct list_head *stream_list, progress); } - cur_lte->output_resource_entry.size = - res_csize; - - cur_lte->output_resource_entry.original_size = - cur_lte->resource_entry.original_size; - - cur_lte->output_resource_entry.offset = - cur_chunk_tab->file_offset; - - cur_lte->output_resource_entry.flags = - cur_lte->resource_entry.flags | - WIM_RESHDR_FLAG_COMPRESSED; - FREE(cur_chunk_tab); cur_chunk_tab = NULL; @@ -1068,9 +1095,15 @@ static int main_writer_thread_proc(struct list_head *stream_list, list_del(&cur_lte->staging_list); if (next == &outstanding_resources) { - DEBUG("No more outstanding resources"); - ret = 0; - goto out; + if (next_lte == NULL) { + DEBUG("No more outstanding resources"); + ret = 0; + goto out; + } else { + DEBUG("No more outstanding resources---" + "but still more to compress!"); + cur_lte = NULL; + } } else { cur_lte = container_of(cur_lte->staging_list.next, struct lookup_table_entry, @@ -1097,11 +1130,14 @@ static int main_writer_thread_proc(struct list_head *stream_list, } out: + if (next_lte) { #ifdef WITH_NTFS_3G - end_wim_resource_read(cur_lte, ni); + end_wim_resource_read(next_lte, ni); #else - end_wim_resource_read(cur_lte); + end_wim_resource_read(next_lte); #endif + } + if (ret == 0) { ret = do_write_stream_list(&my_resources, out_fp, out_ctype, progress_func, @@ -1244,6 +1280,7 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp, struct lookup_table_entry *lte; size_t num_streams = 0; u64 total_bytes = 0; + u64 total_compression_bytes = 0; bool compression_needed = false; union wimlib_progress_info progress; int ret; @@ -1251,14 +1288,12 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp, list_for_each_entry(lte, stream_list, staging_list) { num_streams++; total_bytes += wim_resource_size(lte); - if (!compression_needed - && - (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE - && (lte->resource_location != RESOURCE_IN_WIM - || wimlib_get_compression_type(lte->wim) != out_ctype - || (write_flags & WIMLIB_WRITE_FLAG_REBUILD))) - && wim_resource_size(lte) != 0) - compression_needed = true; + if (out_ctype != WIMLIB_COMPRESSION_TYPE_NONE + && (wim_resource_compression_type(lte) != out_ctype || + (write_flags & WIMLIB_WRITE_FLAG_RECOMPRESS))) + { + total_compression_bytes += wim_resource_size(lte); + } } progress.write_streams.total_bytes = total_bytes; progress.write_streams.total_streams = num_streams; @@ -1273,7 +1308,7 @@ static int write_stream_list(struct list_head *stream_list, FILE *out_fp, } #ifdef ENABLE_MULTITHREADED_COMPRESSION - if (compression_needed && total_bytes >= 1000000 && num_threads != 1) { + if (total_compression_bytes >= 1000000 && num_threads != 1) { ret = write_stream_list_parallel(stream_list, out_fp, out_ctype, @@ -1487,7 +1522,53 @@ out: return ret; } -static void close_wim_writable(WIMStruct *w) +#if defined(HAVE_SYS_FILE_H) && defined(HAVE_FLOCK) +int lock_wim(FILE *fp, const char *path) +{ + int ret = 0; + if (fp) { + ret = flock(fileno(fp), LOCK_EX | LOCK_NB); + if (ret != 0) { + if (errno == EWOULDBLOCK) { + ERROR("`%s' is already being modified or has been " + "mounted read-write\n" + " by another process!", path); + ret = WIMLIB_ERR_ALREADY_LOCKED; + } else { + WARNING("Failed to lock `%s': %s", + path, strerror(errno)); + ret = 0; + } + } + } + return ret; +} +#endif + +static int open_wim_writable(WIMStruct *w, const char *path, + bool trunc, bool readable) +{ + const char *mode; + if (trunc) + if (readable) + mode = "w+b"; + else + mode = "wb"; + else + mode = "r+b"; + + wimlib_assert(w->out_fp == NULL); + w->out_fp = fopen(path, mode); + if (w->out_fp) { + return 0; + } else { + ERROR_WITH_ERRNO("Failed to open `%s' for writing", path); + return WIMLIB_ERR_OPEN; + } +} + + +void close_wim_writable(WIMStruct *w) { if (w->out_fp) { if (fclose(w->out_fp) != 0) { @@ -1502,12 +1583,8 @@ static void close_wim_writable(WIMStruct *w) int begin_write(WIMStruct *w, const char *path, int write_flags) { int ret; - bool need_readable = false; - bool trunc = true; - if (write_flags & WIMLIB_WRITE_FLAG_CHECK_INTEGRITY) - need_readable = true; - - ret = open_wim_writable(w, path, trunc, need_readable); + ret = open_wim_writable(w, path, true, + (write_flags & WIMLIB_WRITE_FLAG_CHECK_INTEGRITY) != 0); if (ret != 0) return ret; /* Write dummy header. It will be overwritten later. */ @@ -1681,10 +1758,6 @@ static int overwrite_wim_inplace(WIMStruct *w, int write_flags, INIT_LIST_HEAD(&stream_list); for (int i = modified_image_idx; i < w->hdr.image_count; i++) { DEBUG("Identifiying streams in image %d", i + 1); - wimlib_assert(w->image_metadata[i].modified); - wimlib_assert(!w->image_metadata[i].has_been_mounted_rw); - wimlib_assert(w->image_metadata[i].root_dentry != NULL); - wimlib_assert(w->image_metadata[i].metadata_lte != NULL); w->private = &stream_list; for_dentry_in_tree(w->image_metadata[i].root_dentry, dentry_find_streams_to_write, w); @@ -1703,7 +1776,6 @@ static int overwrite_wim_inplace(WIMStruct *w, int write_flags, if (modified_image_idx == w->hdr.image_count && !w->deletion_occurred) { /* If no images have been modified and no images have been * deleted, a new lookup table does not need to be written. */ - wimlib_assert(list_empty(&stream_list)); old_wim_end = w->hdr.lookup_table_res_entry.offset + w->hdr.lookup_table_res_entry.size; write_flags |= WIMLIB_WRITE_FLAG_NO_LOOKUP_TABLE | @@ -1719,9 +1791,17 @@ static int overwrite_wim_inplace(WIMStruct *w, int write_flags, if (ret != 0) return ret; + ret = lock_wim(w->out_fp, w->filename); + if (ret != 0) { + fclose(w->out_fp); + w->out_fp = NULL; + return ret; + } + if (fseeko(w->out_fp, old_wim_end, SEEK_SET) != 0) { ERROR_WITH_ERRNO("Can't seek to end of WIM"); - return WIMLIB_ERR_WRITE; + ret = WIMLIB_ERR_WRITE; + goto out_ftruncate; } if (!list_empty(&stream_list)) { @@ -1781,12 +1861,6 @@ static int overwrite_wim_via_tmpfile(WIMStruct *w, int write_flags, goto err; } - /* Close the original WIM file that was opened for reading. */ - if (w->fp != NULL) { - fclose(w->fp); - w->fp = NULL; - } - DEBUG("Renaming `%s' to `%s'", tmpfile, w->filename); /* Rename the new file to the old file .*/ @@ -1804,12 +1878,20 @@ static int overwrite_wim_via_tmpfile(WIMStruct *w, int write_flags, progress_func(WIMLIB_PROGRESS_MSG_RENAME, &progress); } + /* Close the original WIM file that was opened for reading. */ + if (w->fp != NULL) { + fclose(w->fp); + w->fp = NULL; + } + /* Re-open the WIM read-only. */ w->fp = fopen(w->filename, "rb"); if (w->fp == NULL) { ret = WIMLIB_ERR_REOPEN; WARNING("Failed to re-open `%s' read-only: %s", w->filename, strerror(errno)); + FREE(w->filename); + w->filename = NULL; } return ret; err: