From: Yifan Zhao <zhaoyi...@sjtu.edu.cn> Add a workqueue implementation for multi-threading support inspired by xfsprogs.
Signed-off-by: Yifan Zhao <zhaoyi...@sjtu.edu.cn> Suggested-by: Gao Xiang <hsiang...@linux.alibaba.com> Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com> --- configure.ac | 16 +++++ include/erofs/internal.h | 3 + include/erofs/workqueue.h | 34 +++++++++++ lib/Makefile.am | 4 ++ lib/workqueue.c | 123 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 include/erofs/workqueue.h create mode 100644 lib/workqueue.c diff --git a/configure.ac b/configure.ac index 4b59230..3ccd6bb 100644 --- a/configure.ac +++ b/configure.ac @@ -96,6 +96,14 @@ AC_DEFUN([EROFS_UTILS_PARSE_DIRECTORY], AC_ARG_VAR([MAX_BLOCK_SIZE], [The maximum block size which erofs-utils supports]) +AC_MSG_CHECKING([whether to enable multi-threading support]) +AC_ARG_ENABLE([multithreading], + AS_HELP_STRING([--enable-multithreading], + [enable multi-threading support @<:@default=no@:>@]), + [enable_multithreading="$enableval"], + [enable_multithreading="no"]) +AC_MSG_RESULT([$enable_multithreading]) + AC_ARG_ENABLE([debug], [AS_HELP_STRING([--enable-debug], [enable debugging mode @<:@default=no@:>@])], @@ -280,6 +288,13 @@ AS_IF([test "x$MAX_BLOCK_SIZE" = "x"], [ [erofs_cv_max_block_size=4096])) ], [erofs_cv_max_block_size=$MAX_BLOCK_SIZE]) +# Configure multi-threading support +AS_IF([test "x$enable_multithreading" != "xno"], [ + AC_CHECK_HEADERS([pthread.h]) + AC_CHECK_LIB([pthread], [pthread_mutex_lock], [], AC_MSG_ERROR([libpthread is required for multi-threaded build])) + AC_DEFINE(EROFS_MT_ENABLED, 1, [Enable multi-threading support]) +], []) + # Configure debug mode AS_IF([test "x$enable_debug" != "xno"], [], [ dnl Turn off all assert checking. @@ -471,6 +486,7 @@ AS_IF([test "x$enable_fuzzing" != "xyes"], [], [ AM_CONDITIONAL([ENABLE_FUZZING], [test "x${enable_fuzzing}" = "xyes"]) # Set up needed symbols, conditionals and compiler/linker flags +AM_CONDITIONAL([ENABLE_EROFS_MT], [test "x${enable_multithreading}" != "xno"]) AM_CONDITIONAL([ENABLE_LZ4], [test "x${have_lz4}" = "xyes"]) AM_CONDITIONAL([ENABLE_LZ4HC], [test "x${have_lz4hc}" = "xyes"]) AM_CONDITIONAL([ENABLE_FUSE], [test "x${have_fuse}" = "xyes"]) diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 5e968d6..4cd2059 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -22,6 +22,9 @@ typedef unsigned short umode_t; #include <sys/types.h> /* for off_t definition */ #include <sys/stat.h> /* for S_ISCHR definition */ #include <stdio.h> +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif #ifndef PATH_MAX #define PATH_MAX 4096 /* # chars in a path name including nul */ diff --git a/include/erofs/workqueue.h b/include/erofs/workqueue.h new file mode 100644 index 0000000..36037c3 --- /dev/null +++ b/include/erofs/workqueue.h @@ -0,0 +1,34 @@ +/* SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 */ +#ifndef __EROFS_WORKQUEUE_H +#define __EROFS_WORKQUEUE_H + +#include "internal.h" + +struct erofs_workqueue; + +typedef void *(*erofs_wq_func_t)(struct erofs_workqueue *, void *); + +struct erofs_work { + struct erofs_work *next; + void (*fn)(struct erofs_work *work, void *tlsp); +}; + +struct erofs_workqueue { + struct erofs_work *head, *tail; + pthread_mutex_t lock; + pthread_cond_t cond_empty; + pthread_cond_t cond_full; + pthread_t *workers; + unsigned int nworker; + unsigned int max_jobs; + unsigned int job_count; + bool shutdown; + erofs_wq_func_t on_start, on_exit; +}; + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t on_start, + erofs_wq_func_t on_exit); +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work); +int erofs_destroy_workqueue(struct erofs_workqueue *wq); +#endif diff --git a/lib/Makefile.am b/lib/Makefile.am index 54b9c9c..b3bea74 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -53,3 +53,7 @@ liberofs_la_SOURCES += kite_deflate.c compressor_deflate.c if ENABLE_LIBDEFLATE liberofs_la_SOURCES += compressor_libdeflate.c endif +if ENABLE_EROFS_MT +liberofs_la_LDFLAGS = -lpthread +liberofs_la_SOURCES += workqueue.c +endif diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 0000000..47cec9b --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0 +#include <pthread.h> +#include <stdlib.h> +#include "erofs/workqueue.h" + +static void *worker_thread(void *arg) +{ + struct erofs_workqueue *wq = arg; + struct erofs_work *work; + void *tlsp = NULL; + + if (wq->on_start) + tlsp = (wq->on_start)(wq, NULL); + + while (true) { + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == 0 && !wq->shutdown) + pthread_cond_wait(&wq->cond_empty, &wq->lock); + if (wq->job_count == 0 && wq->shutdown) { + pthread_mutex_unlock(&wq->lock); + break; + } + + work = wq->head; + wq->head = work->next; + if (!wq->head) + wq->tail = NULL; + wq->job_count--; + + if (wq->job_count == wq->max_jobs - 1) + pthread_cond_broadcast(&wq->cond_full); + + pthread_mutex_unlock(&wq->lock); + work->fn(work, tlsp); + } + + if (wq->on_exit) + (void)(wq->on_exit)(wq, tlsp); + return NULL; +} + +int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker, + unsigned int max_jobs, erofs_wq_func_t on_start, + erofs_wq_func_t on_exit) +{ + unsigned int i; + int ret; + + if (!wq || nworker <= 0 || max_jobs <= 0) + return -EINVAL; + + wq->head = wq->tail = NULL; + wq->nworker = nworker; + wq->max_jobs = max_jobs; + wq->job_count = 0; + wq->shutdown = false; + wq->on_start = on_start; + wq->on_exit = on_exit; + pthread_mutex_init(&wq->lock, NULL); + pthread_cond_init(&wq->cond_empty, NULL); + pthread_cond_init(&wq->cond_full, NULL); + + wq->workers = malloc(nworker * sizeof(pthread_t)); + if (!wq->workers) + return -ENOMEM; + + for (i = 0; i < nworker; i++) { + ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq); + if (ret) { + while (i) + pthread_cancel(wq->workers[--i]); + free(wq->workers); + return ret; + } + } + return 0; +} + +int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work) +{ + if (!wq || !work) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + + while (wq->job_count == wq->max_jobs) + pthread_cond_wait(&wq->cond_full, &wq->lock); + + work->next = NULL; + if (!wq->head) + wq->head = work; + else + wq->tail->next = work; + wq->tail = work; + wq->job_count++; + + pthread_cond_signal(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + return 0; +} + +int erofs_destroy_workqueue(struct erofs_workqueue *wq) +{ + unsigned int i; + + if (!wq) + return -EINVAL; + + pthread_mutex_lock(&wq->lock); + wq->shutdown = true; + pthread_cond_broadcast(&wq->cond_empty); + pthread_mutex_unlock(&wq->lock); + + for (i = 0; i < wq->nworker; i++) + pthread_join(wq->workers[i], NULL); + + free(wq->workers); + pthread_mutex_destroy(&wq->lock); + pthread_cond_destroy(&wq->cond_empty); + pthread_cond_destroy(&wq->cond_full); + return 0; +} -- 2.39.3