This adds functionality to do work in parallel.

The whole life cycle of such a thread pool would look like

    struct task_queue * tq = create_task_queue(32); // no of threads
    for (...)
        add_task(tq, process_one_item_function, item); // non blocking
    ...
    int ret = finish_task_queue(tq); // blocks until all tasks are done
    if (!tq)
        die ("Not all items were be processed");

The caller must take care of handling the output.

Signed-off-by: Stefan Beller <sbel...@google.com>
---

I sent this a while ago to the list, no comments on it :(
The core functionality stayed the same, but I hope to improved naming and
location of the code.

The WIP is only for the NO_PTHREADS case.


 run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 run-command.h |  30 +++++++++
 2 files changed, 230 insertions(+), 12 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..4029011 100644
--- a/run-command.c
+++ b/run-command.c
@@ -4,6 +4,21 @@
 #include "sigchain.h"
 #include "argv-array.h"
 
+#ifdef NO_PTHREADS
+
+#else
+
+#include "thread-utils.h"
+
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#endif
+
+#include "git-compat-util.h"
+
 void child_process_init(struct child_process *child)
 {
        memset(child, 0, sizeof(*child));
@@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread()
+{
+       if (!main_thread_set) {
+               /*
+                * We assume that the first time that start_async is called
+                * it is from the main thread.
+                */
+               main_thread_set = 1;
+               main_thread = pthread_self();
+               pthread_key_create(&async_key, NULL);
+               pthread_key_create(&async_die_counter, NULL);
+               set_die_routine(die_async);
+               set_die_is_recursing_routine(async_die_is_recursing);
+       }
+}
+
 int start_async(struct async *async)
 {
        int need_in, need_out;
@@ -740,18 +771,7 @@ int start_async(struct async *async)
        else if (async->out)
                close(async->out);
 #else
-       if (!main_thread_set) {
-               /*
-                * We assume that the first time that start_async is called
-                * it is from the main thread.
-                */
-               main_thread_set = 1;
-               main_thread = pthread_self();
-               pthread_key_create(&async_key, NULL);
-               pthread_key_create(&async_die_counter, NULL);
-               set_die_routine(die_async);
-               set_die_is_recursing_routine(async_die_is_recursing);
-       }
+       setup_main_thread();
 
        if (proc_in >= 0)
                set_cloexec(proc_in);
@@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct 
strbuf *buf, size_t hint)
        close(cmd->out);
        return finish_command(cmd);
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+       int (*fct)(struct task_queue *aq, void *task);
+       void *task;
+       struct job_list *next;
+};
+#endif
+
+struct task_queue {
+#ifndef NO_PTHREADS
+       /*
+        * To avoid deadlocks always aquire the semaphores with lowest priority
+        * first, priorites are in descending order as listed.
+        *
+        * The `mutex` is a general purpose lock for modifying data in the async
+        * queue, such as adding a new task or adding a return value from
+        * an already run task.
+        *
+        * `workingcount` and `freecount` are opposing semaphores, the sum of
+        * their values should equal `max_threads` at any time while the `mutex`
+        * is available.
+        */
+       sem_t mutex;
+       sem_t workingcount;
+       sem_t freecount;
+
+       pthread_t *threads;
+       unsigned max_threads;
+
+       struct job_list *first;
+       struct job_list *last;
+#endif
+       int early_return;
+};
+
+#ifndef NO_PTHREADS
+
+static void get_task(struct task_queue *aq,
+                    int (**fct)(struct task_queue *aq, void *task),
+                    void **task,
+                    int *early_return)
+{
+       struct job_list *job;
+
+       sem_wait(&aq->workingcount);
+       sem_wait(&aq->mutex);
+
+       if (!aq->first)
+               die("BUG: internal error with dequeuing jobs for threads");
+       job = aq->first;
+       *fct = job->fct;
+       *task = job->task;
+       aq->early_return |= *early_return;
+       *early_return = aq->early_return;
+       aq->first = job->next;
+       if (!aq->first)
+               aq->last = NULL;
+
+       sem_post(&aq->freecount);
+       sem_post(&aq->mutex);
+
+       free(job);
+}
+
+static void* dispatcher(void *args)
+{
+       void *task;
+       int (*fct)(struct task_queue *aq, void *data);
+       int early_return = 0;
+       struct task_queue *aq = args;
+
+       get_task(aq, &fct, &task, &early_return);
+       while (fct || early_return != 0) {
+               early_return = fct(aq, task);
+               get_task(aq, &fct, &task, &early_return);
+       }
+
+       pthread_exit(0);
+}
+#endif
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+       struct task_queue *aq = xmalloc(sizeof(*aq));
+
+#ifndef NO_PTHREADS
+       int i;
+       if (!max_threads)
+               aq->max_threads = online_cpus();
+       else
+               aq->max_threads = max_threads;
+
+       sem_init(&aq->mutex, 0, 1);
+       sem_init(&aq->workingcount, 0, 0);
+       sem_init(&aq->freecount, 0, aq->max_threads);
+       aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
+
+       for (i = 0; i < aq->max_threads; i++)
+               pthread_create(&aq->threads[i], 0, &dispatcher, aq);
+
+       aq->first = NULL;
+       aq->last = NULL;
+
+       setup_main_thread();
+#endif
+       aq->early_return = 0;
+
+       return aq;
+}
+
+void add_task(struct task_queue *aq,
+             int (*fct)(struct task_queue *aq, void *task),
+             void *task)
+{
+#ifndef NO_PTHREADS
+       struct job_list *job_list;
+
+       job_list = xmalloc(sizeof(*job_list));
+       job_list->task = task;
+       job_list->fct = fct;
+       job_list->next = NULL;
+
+       sem_wait(&aq->freecount);
+       sem_wait(&aq->mutex);
+
+       if (!aq->last) {
+               aq->last = job_list;
+               aq->first = aq->last;
+       } else {
+               aq->last->next = job_list;
+               aq->last = aq->last->next;
+       }
+
+       sem_post(&aq->workingcount);
+       sem_post(&aq->mutex);
+#else
+       ALLOC_GROW(aq->ret->ret, aq->ret->count + 1, aq->ret->alloc);
+       aq->ret->ret[aq->ret->count++] = aq->function(job);
+#endif
+}
+
+int finish_task_queue(struct task_queue *aq)
+{
+       int ret;
+#ifndef NO_PTHREADS
+       int i;
+       for (i = 0; i < aq->max_threads; i++)
+               add_task(aq, NULL, NULL);
+
+       for (i = 0; i < aq->max_threads; i++)
+               pthread_join(aq->threads[i], 0);
+
+       sem_destroy(&aq->mutex);
+       sem_destroy(&aq->workingcount);
+       sem_destroy(&aq->freecount);
+
+       if (aq->first)
+               die("BUG: internal error with queuing jobs for threads");
+
+       free(aq->threads);
+#endif
+       ret = aq->early_return;
+
+       free(aq);
+       return ret;
+}
+
diff --git a/run-command.h b/run-command.h
index 5b4425a..c2cfd49 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,34 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *aq,
+             int (*fct)(struct task_queue *aq, void *task),
+             void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ */
+int finish_task_queue(struct task_queue *aq);
+
 #endif
-- 
2.5.0.264.g01b5c38.dirty

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to