On Thu, Jan 3, 2019 at 3:01 PM Ian Romanick <i...@freedesktop.org> wrote:
> On 11/28/18 6:59 PM, Marek Olšák wrote: > > From: Marek Olšák <marek.ol...@amd.com> > > > > for ARB_parallel_shader_compile > > --- > > src/util/u_queue.c | 49 +++++++++++++++++++++++++++++----------------- > > src/util/u_queue.h | 5 ++--- > > 2 files changed, 33 insertions(+), 21 deletions(-) > > > > diff --git a/src/util/u_queue.c b/src/util/u_queue.c > > index 48c5c79552d..5aaf60ae78e 100644 > > --- a/src/util/u_queue.c > > +++ b/src/util/u_queue.c > > @@ -26,42 +26,43 @@ > > > > #include "u_queue.h" > > > > #include <time.h> > > > > #include "util/os_time.h" > > #include "util/u_string.h" > > #include "util/u_thread.h" > > #include "u_process.h" > > > > -static void util_queue_killall_and_wait(struct util_queue *queue); > > +static void > > +util_queue_kill_threads(struct util_queue *queue, unsigned > keep_num_threads); > > > > > /**************************************************************************** > > * Wait for all queues to assert idle when exit() is called. > > * > > * Otherwise, C++ static variable destructors can be called while > threads > > * are using the static variables. > > */ > > > > static once_flag atexit_once_flag = ONCE_FLAG_INIT; > > static struct list_head queue_list; > > static mtx_t exit_mutex = _MTX_INITIALIZER_NP; > > > > static void > > atexit_handler(void) > > { > > struct util_queue *iter; > > > > mtx_lock(&exit_mutex); > > /* Wait for all queues to assert idle. */ > > LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { > > - util_queue_killall_and_wait(iter); > > + util_queue_kill_threads(iter, 0); > > } > > mtx_unlock(&exit_mutex); > > } > > > > static void > > global_init(void) > > { > > LIST_INITHEAD(&queue_list); > > atexit(atexit_handler); > > } > > @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) > > u_thread_setname(name); > > } > > > > while (1) { > > struct util_queue_job job; > > > > mtx_lock(&queue->lock); > > assert(queue->num_queued >= 0 && queue->num_queued <= > queue->max_jobs); > > > > /* wait if the queue is empty */ > > - while (!queue->kill_threads && queue->num_queued == 0) > > + while (thread_index < queue->num_threads && queue->num_queued == > 0) > > cnd_wait(&queue->has_queued_cond, &queue->lock); > > > > - if (queue->kill_threads) { > > + /* only kill threads that are above "num_threads" */ > > + if (thread_index >= queue->num_threads) { > > mtx_unlock(&queue->lock); > > break; > > } > > > > job = queue->jobs[queue->read_idx]; > > memset(&queue->jobs[queue->read_idx], 0, sizeof(struct > util_queue_job)); > > queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; > > > > queue->num_queued--; > > cnd_signal(&queue->has_space_cond); > > mtx_unlock(&queue->lock); > > > > if (job.job) { > > job.execute(job.job, thread_index); > > util_queue_fence_signal(job.fence); > > if (job.cleanup) > > job.cleanup(job.job, thread_index); > > } > > } > > > > - /* signal remaining jobs before terminating */ > > + /* signal remaining jobs if all threads are being terminated */ > > mtx_lock(&queue->lock); > > - for (unsigned i = queue->read_idx; i != queue->write_idx; > > - i = (i + 1) % queue->max_jobs) { > > - if (queue->jobs[i].job) { > > - util_queue_fence_signal(queue->jobs[i].fence); > > - queue->jobs[i].job = NULL; > > + if (queue->num_threads == 0) { > > + for (unsigned i = queue->read_idx; i != queue->write_idx; > > + i = (i + 1) % queue->max_jobs) { > > + if (queue->jobs[i].job) { > > + util_queue_fence_signal(queue->jobs[i].fence); > > + queue->jobs[i].job = NULL; > > + } > > } > > + queue->read_idx = queue->write_idx; > > + queue->num_queued = 0; > > } > > - queue->read_idx = queue->write_idx; > > - queue->num_queued = 0; > > mtx_unlock(&queue->lock); > > return 0; > > } > > > > static bool > > util_queue_create_thread(struct util_queue *queue, unsigned index) > > { > > struct thread_input *input = > > (struct thread_input *) malloc(sizeof(struct thread_input)); > > input->queue = queue; > > @@ -418,60 +422,69 @@ fail: > > cnd_destroy(&queue->has_queued_cond); > > mtx_destroy(&queue->lock); > > free(queue->jobs); > > } > > /* also util_queue_is_initialized can be used to check for success */ > > memset(queue, 0, sizeof(*queue)); > > return false; > > } > > > > static void > > -util_queue_killall_and_wait(struct util_queue *queue) > > +util_queue_kill_threads(struct util_queue *queue, unsigned > keep_num_threads) > > { > > unsigned i; > > > > /* Signal all threads to terminate. */ > > + mtx_lock(&queue->finish_lock); > > + > > + if (keep_num_threads >= queue->num_threads) { > > + mtx_unlock(&queue->finish_lock); > > + return; > > + } > > + > > mtx_lock(&queue->lock); > > - queue->kill_threads = 1; > > + unsigned old_num_threads = queue->num_threads; > > + queue->num_threads = keep_num_threads; > > Shouldn't this still be set below, after the threads are joined? > The trick is that setting num_threads is what terminates the threads. Then cnd_broadcast wakes up the threads and they will run to the end when they see that thread_index >= num_threads. I've added this explanation as a code comment locally. Marek > > cnd_broadcast(&queue->has_queued_cond); > > mtx_unlock(&queue->lock); > > > > - for (i = 0; i < queue->num_threads; i++) > > + for (i = keep_num_threads; i < old_num_threads; i++) > > thrd_join(queue->threads[i], NULL); > > - queue->num_threads = 0; > > + > > + mtx_unlock(&queue->finish_lock); > > } > > > > void > > util_queue_destroy(struct util_queue *queue) > > { > > - util_queue_killall_and_wait(queue); > > + util_queue_kill_threads(queue, 0); > > remove_from_atexit_list(queue); > > > > cnd_destroy(&queue->has_space_cond); > > cnd_destroy(&queue->has_queued_cond); > > mtx_destroy(&queue->finish_lock); > > mtx_destroy(&queue->lock); > > free(queue->jobs); > > free(queue->threads); > > } > > > > void > > util_queue_add_job(struct util_queue *queue, > > void *job, > > struct util_queue_fence *fence, > > util_queue_execute_func execute, > > util_queue_execute_func cleanup) > > { > > struct util_queue_job *ptr; > > > > mtx_lock(&queue->lock); > > - if (queue->kill_threads) { > > + if (queue->num_threads == 0) { > > mtx_unlock(&queue->lock); > > /* well no good option here, but any leaks will be > > * short-lived as things are shutting down.. > > */ > > return; > > } > > > > util_queue_fence_reset(fence); > > > > assert(queue->num_queued >= 0 && queue->num_queued <= > queue->max_jobs); > > diff --git a/src/util/u_queue.h b/src/util/u_queue.h > > index 4e63a76aab2..756fa53e1bf 100644 > > --- a/src/util/u_queue.h > > +++ b/src/util/u_queue.h > > @@ -194,29 +194,28 @@ typedef void (*util_queue_execute_func)(void *job, > int thread_index); > > struct util_queue_job { > > void *job; > > struct util_queue_fence *fence; > > util_queue_execute_func execute; > > util_queue_execute_func cleanup; > > }; > > > > /* Put this into your context. */ > > struct util_queue { > > char name[14]; /* 13 characters = the thread name without the index > */ > > - mtx_t finish_lock; /* only for util_queue_finish */ > > + mtx_t finish_lock; /* for util_queue_finish and protects > threads/num_threads */ > > mtx_t lock; > > cnd_t has_queued_cond; > > cnd_t has_space_cond; > > thrd_t *threads; > > unsigned flags; > > int num_queued; > > - unsigned num_threads; > > - int kill_threads; > > + unsigned num_threads; /* decreasing this number will terminate > threads */ > > int max_jobs; > > int write_idx, read_idx; /* ring buffer pointers */ > > struct util_queue_job *jobs; > > > > /* for cleanup at exit(), protected by exit_mutex */ > > struct list_head head; > > }; > > > > bool util_queue_init(struct util_queue *queue, > > const char *name, > > > >
_______________________________________________ mesa-dev mailing list mesa-dev@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/mesa-dev