Avoid pthread_cond_broadcast that wakes up all workers. Make each of them uses distict mutex/cond.
Benchmark using afir with threads=4 and 4096 taps fir: channels=1: old: 2128210 decicycles in afir_execute, 2 runs, 0 skips 1382927 decicycles in afir_execute, 1024 runs, 0 skips 1367985 decicycles in afir_execute, 16374 runs, 10 skips new: 1011270 decicycles in afir_execute, 2 runs, 0 skips 939891 decicycles in afir_execute, 1024 runs, 0 skips 955812 decicycles in afir_execute, 16383 runs, 1 skips channels=2: old: 2801720 decicycles in afir_execute, 2 runs, 0 skips 1624556 decicycles in afir_execute, 1024 runs, 0 skips 1722584 decicycles in afir_execute, 16380 runs, 4 skips new: 1864780 decicycles in afir_execute, 2 runs, 0 skips 1307955 decicycles in afir_execute, 1024 runs, 0 skips 1110917 decicycles in afir_execute, 16384 runs, 0 skips channels=3: old: 3031255 decicycles in afir_execute, 2 runs, 0 skips 2545295 decicycles in afir_execute, 1024 runs, 0 skips 2498368 decicycles in afir_execute, 16384 runs, 0 skips new: 2213540 decicycles in afir_execute, 2 runs, 0 skips 2305479 decicycles in afir_execute, 1024 runs, 0 skips 2001942 decicycles in afir_execute, 16382 runs, 2 skips channels=4: old: 4642510 decicycles in afir_execute, 2 runs, 0 skips 3356856 decicycles in afir_execute, 1024 runs, 0 skips 2994766 decicycles in afir_execute, 16382 runs, 2 skips new: 3590650 decicycles in afir_execute, 2 runs, 0 skips 2456035 decicycles in afir_execute, 1024 runs, 0 skips 2332966 decicycles in afir_execute, 16384 runs, 0 skips channels=6: old: 5057785 decicycles in afir_execute, 2 runs, 0 skips 4279000 decicycles in afir_execute, 1023 runs, 1 skips 4102256 decicycles in afir_execute, 16383 runs, 1 skips new: 4244160 decicycles in afir_execute, 2 runs, 0 skips 3851306 decicycles in afir_execute, 1024 runs, 0 skips 3343221 decicycles in afir_execute, 16384 runs, 0 skips channels=8: old: 4871740 decicycles in afir_execute, 2 runs, 0 skips 4807337 decicycles in afir_execute, 1023 runs, 1 skips 4454018 decicycles in afir_execute, 16374 runs, 10 skips new: 5055460 decicycles in afir_execute, 2 runs, 0 skips 4554674 decicycles in afir_execute, 1023 runs, 1 skips 4433398 decicycles in afir_execute, 16382 runs, 2 skips Signed-off-by: Muhammad Faiz <mfc...@gmail.com> --- libavfilter/pthread.c | 189 +++++++++++++++++++++++++++++++------------------- 1 file changed, 116 insertions(+), 73 deletions(-) diff --git a/libavfilter/pthread.c b/libavfilter/pthread.c index c7a0021..8fb3409 100644 --- a/libavfilter/pthread.c +++ b/libavfilter/pthread.c @@ -21,6 +21,7 @@ * Libavfilter multithreading support */ +#include <stdatomic.h> #include "config.h" #include "libavutil/common.h" @@ -32,61 +33,75 @@ #include "internal.h" #include "thread.h" +typedef struct WorkerContext WorkerContext; + typedef struct ThreadContext { AVFilterGraph *graph; - int nb_threads; - pthread_t *workers; + int nb_workers; + WorkerContext *workers; avfilter_action_func *func; /* per-execute parameters */ AVFilterContext *ctx; void *arg; int *rets; - int nb_jobs; + unsigned nb_jobs; - pthread_cond_t last_job_cond; - pthread_cond_t current_job_cond; - pthread_mutex_t current_job_lock; - int current_job; - unsigned int current_execute; + pthread_mutex_t mutex_done; + pthread_cond_t cond_done; + atomic_uint current_job; + atomic_uint nb_finished_jobs; int done; } ThreadContext; -static void* attribute_align_arg worker(void *v) +struct WorkerContext { + ThreadContext *ctx; + pthread_t thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int done; +}; + +static unsigned run_jobs(ThreadContext *c) { - ThreadContext *c = v; - int our_job = c->nb_jobs; - int nb_threads = c->nb_threads; - unsigned int last_execute = 0; - int ret, self_id; - - pthread_mutex_lock(&c->current_job_lock); - self_id = c->current_job++; - - for (;;) { - while (our_job >= c->nb_jobs) { - if (c->current_job == nb_threads + c->nb_jobs) - pthread_cond_signal(&c->last_job_cond); - - while (last_execute == c->current_execute && !c->done) - pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); - last_execute = c->current_execute; - our_job = self_id; - - if (c->done) { - pthread_mutex_unlock(&c->current_job_lock); - return NULL; - } - } - pthread_mutex_unlock(&c->current_job_lock); + unsigned current_job, nb_finished_jobs = 0; - ret = c->func(c->ctx, c->arg, our_job, c->nb_jobs); + while (nb_finished_jobs != c->nb_jobs && + (current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->nb_jobs) { + int ret = c->func(c->ctx, c->arg, current_job, c->nb_jobs); if (c->rets) - c->rets[our_job % c->nb_jobs] = ret; + c->rets[current_job] = ret; + nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_acq_rel) + 1; + } + + return nb_finished_jobs; +} + +static void* attribute_align_arg worker(void *v) +{ + WorkerContext *w = v; + ThreadContext *c = w->ctx; + + pthread_mutex_lock(&w->mutex); + pthread_cond_signal(&w->cond); + + while (1) { + w->done = 1; + while (w->done) + pthread_cond_wait(&w->cond, &w->mutex); + + if (c->done) { + pthread_mutex_unlock(&w->mutex); + return NULL; + } - pthread_mutex_lock(&c->current_job_lock); - our_job = c->current_job++; + if (run_jobs(c) == c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + c->done = 1; + pthread_cond_signal(&c->cond_done); + pthread_mutex_unlock(&c->mutex_done); + } } } @@ -94,48 +109,66 @@ static void slice_thread_uninit(ThreadContext *c) { int i; - pthread_mutex_lock(&c->current_job_lock); c->done = 1; - pthread_cond_broadcast(&c->current_job_cond); - pthread_mutex_unlock(&c->current_job_lock); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - for (i = 0; i < c->nb_threads; i++) - pthread_join(c->workers[i], NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); - pthread_mutex_destroy(&c->current_job_lock); - pthread_cond_destroy(&c->current_job_cond); - pthread_cond_destroy(&c->last_job_cond); - av_freep(&c->workers); -} + pthread_join(w->thread, NULL); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); + } -static void slice_thread_park_workers(ThreadContext *c) -{ - while (c->current_job != c->nb_threads + c->nb_jobs) - pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); - pthread_mutex_unlock(&c->current_job_lock); + pthread_cond_destroy(&c->cond_done); + pthread_mutex_destroy(&c->mutex_done); + av_freep(&c->workers); } static int thread_execute(AVFilterContext *ctx, avfilter_action_func *func, - void *arg, int *ret, int nb_jobs) + void *arg, int *rets, int nb_jobs) { ThreadContext *c = ctx->graph->internal->thread; + int nb_workers, i; if (nb_jobs <= 0) return 0; - pthread_mutex_lock(&c->current_job_lock); + if (nb_jobs == 1) { + int ret = func(ctx, arg, 0, 1); + if (rets) + rets[0] = ret; + return 0; + } - c->current_job = c->nb_threads; + nb_workers = FFMIN(c->nb_workers, nb_jobs - 1); + atomic_store_explicit(&c->current_job, 0, memory_order_relaxed); + atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed); c->nb_jobs = nb_jobs; c->ctx = ctx; c->arg = arg; c->func = func; - c->rets = ret; - c->current_execute++; + c->rets = rets; + + for (i = 0; i < nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - pthread_cond_broadcast(&c->current_job_cond); + pthread_mutex_lock(&w->mutex); + w->done = 0; + pthread_cond_signal(&w->cond); + pthread_mutex_unlock(&w->mutex); + } - slice_thread_park_workers(c); + if (run_jobs(c) != c->nb_jobs) { + pthread_mutex_lock(&c->mutex_done); + while (!c->done) + pthread_cond_wait(&c->cond_done, &c->mutex_done); + c->done = 0; + pthread_mutex_unlock(&c->mutex_done); + } return 0; } @@ -156,33 +189,43 @@ static int thread_init_internal(ThreadContext *c, int nb_threads) if (nb_threads <= 1) return 1; - c->nb_threads = nb_threads; - c->workers = av_mallocz_array(sizeof(*c->workers), nb_threads); + c->nb_workers = nb_threads - 1; + c->workers = av_mallocz_array(sizeof(*c->workers), c->nb_workers); if (!c->workers) return AVERROR(ENOMEM); - c->current_job = 0; + pthread_mutex_init(&c->mutex_done, NULL); + pthread_cond_init(&c->cond_done, NULL); + atomic_init(&c->current_job, 0); + atomic_init(&c->nb_finished_jobs, 0); c->nb_jobs = 0; c->done = 0; - pthread_cond_init(&c->current_job_cond, NULL); - pthread_cond_init(&c->last_job_cond, NULL); + for (i = 0; i < c->nb_workers; i++) { + WorkerContext *w = &c->workers[i]; - pthread_mutex_init(&c->current_job_lock, NULL); - pthread_mutex_lock(&c->current_job_lock); - for (i = 0; i < nb_threads; i++) { - ret = pthread_create(&c->workers[i], NULL, worker, c); + w->ctx = c; + pthread_mutex_init(&w->mutex, NULL); + pthread_cond_init(&w->cond, NULL); + pthread_mutex_lock(&w->mutex); + w->done = 0; + ret = pthread_create(&w->thread, NULL, worker, w); if (ret) { - pthread_mutex_unlock(&c->current_job_lock); - c->nb_threads = i; + c->nb_workers = i; + pthread_mutex_unlock(&w->mutex); + pthread_cond_destroy(&w->cond); + pthread_mutex_destroy(&w->mutex); slice_thread_uninit(c); return AVERROR(ret); } + + while (!w->done) + pthread_cond_wait(&w->cond, &w->mutex); + pthread_mutex_unlock(&w->mutex); } - slice_thread_park_workers(c); - return c->nb_threads; + return c->nb_workers + 1; } int ff_graph_thread_init(AVFilterGraph *graph) -- 2.9.3 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel