From: Qiao Nuohan <[email protected]>

Use several threads to read and compress pages and one thread to write
the produced pages into dumpfile. This patch is used for cyclic mode.

Signed-off-by: Qiao Nuohan <[email protected]>
---
 makedumpfile.c |  390 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 390 insertions(+), 0 deletions(-)

diff --git a/makedumpfile.c b/makedumpfile.c
index bce6dc3..86426d8 100644
--- a/makedumpfile.c
+++ b/makedumpfile.c
@@ -7672,6 +7672,396 @@ out:
        return ret;
 }
 
+void *
+kdump_thread_function_cyclic(void *arg) {
+       void *retval = PTHREAD_FAIL;
+       struct thread_args *kdump_thread_args = (struct thread_args *)arg;
+       struct page_data *page_data_buf = kdump_thread_args->page_data_buf;
+       int page_data_num = kdump_thread_args->page_data_num;
+       mdf_pfn_t pfn;
+       mdf_pfn_t consumed_pfn;
+       int index;
+       int found;
+       int fd_memory = 0;
+       struct dump_bitmap bitmap_memory_parallel;
+       unsigned char *buf = NULL, *buf_out = NULL;
+       struct mmap_cache *mmap_cache =
+                       MMAP_CACHE_PARALLEL(kdump_thread_args->thread_num);
+       unsigned long size_out;
+#ifdef USELZO
+       lzo_bytep wrkmem = WRKMEM_PARALLEL(kdump_thread_args->thread_num);
+#endif
+#ifdef USESNAPPY
+       unsigned long len_buf_out_snappy =
+                               snappy_max_compressed_length(info->page_size);
+#endif
+
+       buf = BUF_PARALLEL(kdump_thread_args->thread_num);
+       buf_out = BUF_OUT_PARALLEL(kdump_thread_args->thread_num);
+
+       fd_memory = FD_MEMORY_PARALLEL(kdump_thread_args->thread_num);
+
+       if (info->flag_refiltering) {
+               initialize_bitmap_memory_parallel(&bitmap_memory_parallel,
+                                               kdump_thread_args->thread_num);
+       }
+
+       while (1) {
+               /* get next pfn */
+               pthread_mutex_lock(&info->current_pfn_mutex);
+               pfn = info->current_pfn;
+               info->current_pfn++;
+               pthread_mutex_unlock(&info->current_pfn_mutex);
+
+               if (pfn >= kdump_thread_args->end_pfn)
+                       break;
+
+               index = -1;
+               found = FALSE;
+
+               while (found == FALSE) {
+                       /*
+                        * need a cancellation point here
+                        */
+                       sleep(0);
+
+                       index = (index + 1) % page_data_num;
+
+                       if (pthread_mutex_trylock(&page_data_buf[index].mutex) 
!= 0)
+                               continue;
+
+                       if (page_data_buf[index].ready != 0)
+                               goto unlock;
+
+                       pthread_mutex_lock(&info->consumed_pfn_mutex);
+                       if ((long)page_data_buf[index].pfn >
+                                               (long)info->consumed_pfn)
+                               info->consumed_pfn = page_data_buf[index].pfn;
+                       consumed_pfn = info->consumed_pfn;
+                       pthread_mutex_unlock(&info->consumed_pfn_mutex);
+
+                       /*
+                        * leave space for slow producer
+                        */
+                       if ((long)pfn - (long)consumed_pfn > page_data_num)
+                               goto unlock;
+
+                       found = TRUE;
+
+                       page_data_buf[index].pfn = pfn;
+                       page_data_buf[index].ready = 1;
+
+                       if (!is_on(info->partial_bitmap2,
+                                       pfn - kdump_thread_args->start_pfn)) {
+                               page_data_buf[index].dumpable = FALSE;
+                               goto unlock;
+                       }
+
+                       page_data_buf[index].dumpable = TRUE;
+
+                       if (!read_pfn_parallel(fd_memory, pfn, buf,
+                                              &bitmap_memory_parallel,
+                                              mmap_cache))
+                                       goto fail;
+
+                       filter_data_buffer_parallel(buf, pfn_to_paddr(pfn),
+                                                       info->page_size,
+                                                       &info->filter_mutex);
+
+                       if ((info->dump_level & DL_EXCLUDE_ZERO)
+                           && is_zero_page(buf, info->page_size)) {
+                               page_data_buf[index].zero = TRUE;
+                               goto unlock;
+                       }
+
+                       page_data_buf[index].zero = FALSE;
+
+                       /*
+                        * Compress the page data.
+                        */
+                       size_out = kdump_thread_args->len_buf_out;
+                       if ((info->flag_compress & DUMP_DH_COMPRESSED_ZLIB)
+                           && ((size_out = kdump_thread_args->len_buf_out),
+                               compress2(buf_out, &size_out, buf,
+                                         info->page_size,
+                                         Z_BEST_SPEED) == Z_OK)
+                           && (size_out < info->page_size)) {
+                               page_data_buf[index].flags =
+                                                       DUMP_DH_COMPRESSED_ZLIB;
+                               page_data_buf[index].size  = size_out;
+                               memcpy(page_data_buf[index].buf, buf_out, 
size_out);
+#ifdef USELZO
+                       } else if (info->flag_lzo_support
+                                  && (info->flag_compress
+                                      & DUMP_DH_COMPRESSED_LZO)
+                                  && ((size_out = info->page_size),
+                                      lzo1x_1_compress(buf, info->page_size,
+                                                       buf_out, &size_out,
+                                                       wrkmem) == LZO_E_OK)
+                                  && (size_out < info->page_size)) {
+                               page_data_buf[index].flags =
+                                                       DUMP_DH_COMPRESSED_LZO;
+                               page_data_buf[index].size  = size_out;
+                               memcpy(page_data_buf[index].buf, buf_out, 
size_out);
+#endif
+#ifdef USESNAPPY
+                       } else if ((info->flag_compress
+                                   & DUMP_DH_COMPRESSED_SNAPPY)
+                                  && ((size_out = len_buf_out_snappy),
+                                      snappy_compress((char *)buf,
+                                                      info->page_size,
+                                                      (char *)buf_out,
+                                                      (size_t *)&size_out)
+                                      == SNAPPY_OK)
+                                  && (size_out < info->page_size)) {
+                               page_data_buf[index].flags =
+                                               DUMP_DH_COMPRESSED_SNAPPY;
+                               page_data_buf[index].size  = size_out;
+                               memcpy(page_data_buf[index].buf, buf_out, 
size_out);
+#endif
+                       } else {
+                               page_data_buf[index].flags = 0;
+                               page_data_buf[index].size  = info->page_size;
+                               memcpy(page_data_buf[index].buf, buf, 
info->page_size);
+                       }
+unlock:
+                       pthread_mutex_unlock(&page_data_buf[index].mutex);
+
+               }
+       }
+
+       retval = NULL;
+
+fail:
+       if (bitmap_memory_parallel.fd > 0)
+               close(bitmap_memory_parallel.fd);
+
+       pthread_exit(retval);
+}
+
+int
+write_kdump_pages_parallel_cyclic(struct cache_data *cd_header,
+                                 struct cache_data *cd_page,
+                                 struct page_desc *pd_zero,
+                                 off_t *offset_data, struct cycle *cycle)
+{
+       int ret = FALSE;
+       int res;
+       unsigned long len_buf_out;
+       mdf_pfn_t per;
+       mdf_pfn_t start_pfn, end_pfn;
+       struct page_desc pd;
+       struct timeval tv_start;
+       struct timeval last, new;
+       unsigned long long consuming_pfn;
+       pthread_t **threads = NULL;
+       struct thread_args *kdump_thread_args = NULL;
+       void *thread_result;
+       int page_data_num;
+       struct page_data *page_data_buf = NULL;
+       int i;
+       int index;
+
+       if (info->flag_elf_dumpfile)
+               return FALSE;
+
+       res = pthread_mutex_init(&info->current_pfn_mutex, NULL);
+       if (res != 0) {
+               ERRMSG("Can't initialize current_pfn_mutex. %s\n",
+                               strerror(res));
+               goto out;
+       }
+
+       res = pthread_mutex_init(&info->consumed_pfn_mutex, NULL);
+       if (res != 0) {
+               ERRMSG("Can't initialize consumed_pfn_mutex. %s\n",
+                               strerror(res));
+               goto out;
+       }
+
+       res = pthread_mutex_init(&info->filter_mutex, NULL);
+       if (res != 0) {
+               ERRMSG("Can't initialize filter_mutex. %s\n", strerror(res));
+               goto out;
+       }
+
+       res = pthread_rwlock_init(&info->usemmap_rwlock, NULL);
+       if (res != 0) {
+               ERRMSG("Can't initialize usemmap_rwlock. %s\n", strerror(res));
+               goto out;
+       }
+
+       len_buf_out = calculate_len_buf_out(info->page_size);
+
+       per = info->num_dumpable / 10000;
+       per = per ? per : 1;
+
+       gettimeofday(&tv_start, NULL);
+
+       start_pfn = cycle->start_pfn;
+       end_pfn   = cycle->end_pfn;
+
+       info->current_pfn = start_pfn;
+       info->consumed_pfn = start_pfn - 1;
+
+       threads = info->threads;
+       kdump_thread_args = info->kdump_thread_args;
+
+       page_data_num = info->num_buffers;
+       page_data_buf = info->page_data_buf;
+
+       for (i = 0; i < page_data_num; i++) {
+               /*
+                * producer will use pfn in page_data_buf to decide the
+                * consumed pfn
+                */
+               page_data_buf[i].pfn = start_pfn - 1;
+               page_data_buf[i].ready = 0;
+               res = pthread_mutex_init(&page_data_buf[i].mutex, NULL);
+               if (res != 0) {
+                       ERRMSG("Can't initialize mutex of page_data_buf. %s\n",
+                                       strerror(res));
+                       goto out;
+               }
+       }
+
+       for (i = 0; i < info->num_threads; i++) {
+               kdump_thread_args[i].thread_num = i;
+               kdump_thread_args[i].len_buf_out = len_buf_out;
+               kdump_thread_args[i].start_pfn = start_pfn;
+               kdump_thread_args[i].end_pfn = end_pfn;
+               kdump_thread_args[i].page_data_num = page_data_num;
+               kdump_thread_args[i].page_data_buf = page_data_buf;
+
+               res = pthread_create(threads[i], NULL,
+                                    kdump_thread_function_cyclic,
+                                    (void *)&kdump_thread_args[i]);
+               if (res != 0) {
+                       ERRMSG("Can't create thread %d. %s\n",
+                                       i, strerror(res));
+                       goto out;
+               }
+       }
+
+       consuming_pfn = start_pfn;
+       index = -1;
+
+       gettimeofday(&last, NULL);
+
+       while (consuming_pfn < end_pfn) {
+               index = (index + 1) % page_data_num;
+
+               gettimeofday(&new, NULL);
+               if (new.tv_sec - last.tv_sec > WAIT_TIME) {
+                       ERRMSG("Can't get data of pfn %llx.\n", consuming_pfn);
+                       goto out;
+               }
+
+               /*
+                * check pfn first without mutex locked to reduce the time
+                * trying to lock the mutex
+                */
+               if (page_data_buf[index].pfn != consuming_pfn)
+                       continue;
+
+               pthread_mutex_lock(&page_data_buf[index].mutex);
+
+               /* check whether the found one is ready to be consumed */
+               if (page_data_buf[index].pfn != consuming_pfn ||
+                   page_data_buf[index].ready != 1) {
+                       goto unlock;
+               }
+
+               if ((num_dumped % per) == 0)
+                       print_progress(PROGRESS_COPY, num_dumped, 
info->num_dumpable);
+
+               /* next pfn is found, refresh last here */
+               last = new;
+               consuming_pfn++;
+               page_data_buf[index].ready = 0;
+
+               if (page_data_buf[index].dumpable == FALSE)
+                       goto unlock;
+
+               num_dumped++;
+
+               if (page_data_buf[index].zero == TRUE) {
+                       if (!write_cache(cd_header, pd_zero, 
sizeof(page_desc_t)))
+                               goto out;
+                       pfn_zero++;
+               } else {
+                       pd.flags      = page_data_buf[index].flags;
+                       pd.size       = page_data_buf[index].size;
+                       pd.page_flags = 0;
+                       pd.offset     = *offset_data;
+                       *offset_data  += pd.size;
+                       /*
+                        * Write the page header.
+                        */
+                       if (!write_cache(cd_header, &pd, sizeof(page_desc_t)))
+                               goto out;
+                       /*
+                        * Write the page data.
+                        */
+                       if (!write_cache(cd_page, page_data_buf[index].buf, 
pd.size))
+                               goto out;
+
+               }
+unlock:
+               pthread_mutex_unlock(&page_data_buf[index].mutex);
+       }
+
+       ret = TRUE;
+       /*
+        * print [100 %]
+        */
+       print_progress(PROGRESS_COPY, num_dumped, info->num_dumpable);
+       print_execution_time(PROGRESS_COPY, &tv_start);
+       PROGRESS_MSG("\n");
+
+out:
+       if (threads != NULL) {
+               for (i = 0; i < info->num_threads; i++) {
+                       if (threads[i] != NULL) {
+                               res = pthread_cancel(*threads[i]);
+                               if (res != 0 && res != ESRCH)
+                                       ERRMSG("Can't cancel thread %d. %s\n",
+                                                       i, strerror(res));
+                       }
+               }
+
+               for (i = 0; i < info->num_threads; i++) {
+                       if (threads[i] != NULL) {
+                               res = pthread_join(*threads[i], &thread_result);
+                               if (res != 0)
+                                       ERRMSG("Can't join with thread %d. 
%s\n",
+                                                       i, strerror(res));
+
+                               if (thread_result == PTHREAD_CANCELED)
+                                       DEBUG_MSG("Thread %d is cancelled.\n", 
i);
+                               else if (thread_result == PTHREAD_FAIL)
+                                       DEBUG_MSG("Thread %d fails.\n", i);
+                               else
+                                       DEBUG_MSG("Thread %d finishes.\n", i);
+
+                       }
+               }
+       }
+
+       if (page_data_buf != NULL) {
+               for (i = 0; i < page_data_num; i++) {
+                       pthread_mutex_destroy(&page_data_buf[i].mutex);
+               }
+       }
+
+       pthread_rwlock_destroy(&info->usemmap_rwlock);
+       pthread_mutex_destroy(&info->filter_mutex);
+       pthread_mutex_destroy(&info->consumed_pfn_mutex);
+       pthread_mutex_destroy(&info->current_pfn_mutex);
+
+       return ret;
+}
+
 int
 write_kdump_pages_cyclic(struct cache_data *cd_header, struct cache_data 
*cd_page,
                         struct page_desc *pd_zero, off_t *offset_data, struct 
cycle *cycle)
-- 
1.7.1


_______________________________________________
kexec mailing list
[email protected]
http://lists.infradead.org/mailman/listinfo/kexec

Reply via email to