By treating each object as its own task the workflow is easier to follow
as the function used in the worker threads doesn't need any control logic
any more.

Signed-off-by: Stefan Beller <sbel...@google.com>
---
 builtin/index-pack.c | 71 +++++++++++++++++++++++-----------------------------
 1 file changed, 32 insertions(+), 39 deletions(-)

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..826bd22 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] 
[--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -95,7 +96,6 @@ static const char *curr_pack;
 #ifndef NO_PTHREADS
 
 static struct thread_local *thread_data;
-static int nr_dispatched;
 static int threads_active;
 
 static pthread_mutex_t read_mutex;
@@ -106,10 +106,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock()         lock_mutex(&counter_mutex)
 #define counter_unlock()       unlock_mutex(&counter_mutex)
 
-static pthread_mutex_t work_mutex;
-#define work_lock()            lock_mutex(&work_mutex)
-#define work_unlock()          unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock()   lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock() unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +136,6 @@ static void init_thread(void)
        int i;
        init_recursive_mutex(&read_mutex);
        pthread_mutex_init(&counter_mutex, NULL);
-       pthread_mutex_init(&work_mutex, NULL);
        pthread_mutex_init(&type_cas_mutex, NULL);
        if (show_stat)
                pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +158,6 @@ static void cleanup_thread(void)
        threads_active = 0;
        pthread_mutex_destroy(&read_mutex);
        pthread_mutex_destroy(&counter_mutex);
-       pthread_mutex_destroy(&work_mutex);
        pthread_mutex_destroy(&type_cas_mutex);
        if (show_stat)
                pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +175,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()
 
-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()
 
@@ -1075,28 +1066,29 @@ static void resolve_base(struct object_entry *obj)
 }
 
 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
-       set_thread_data(data);
-       for (;;) {
-               int i;
-               counter_lock();
-               display_progress(progress, nr_resolved_deltas);
-               counter_unlock();
-               work_lock();
-               while (nr_dispatched < nr_objects &&
-                      is_delta_type(objects[nr_dispatched].type))
-                       nr_dispatched++;
-               if (nr_dispatched >= nr_objects) {
-                       work_unlock();
-                       break;
-               }
-               i = nr_dispatched++;
-               work_unlock();
+       if (!get_thread_data()) {
+               struct thread_local *t = xmalloc(sizeof(*t));
+               t->pack_fd = open(curr_pack, O_RDONLY);
+               if (t->pack_fd == -1)
+                       die_errno(_("unable to open %s"), curr_pack);
 
-               resolve_base(&objects[i]);
+               set_thread_data(t);
        }
-       return NULL;
+
+       resolve_base(data);
+
+       counter_lock();
+       display_progress(progress, nr_resolved_deltas);
+       counter_unlock();
+       return 0;
+}
+
+void cleanup_threaded_second_pass(struct task_queue *aq)
+{
+       struct thread_local *t = get_thread_data();
+       free(t);
 }
 #endif
 
@@ -1195,18 +1187,19 @@ static void resolve_deltas(void)
                                          nr_ref_deltas + nr_ofs_deltas);
 
 #ifndef NO_PTHREADS
-       nr_dispatched = 0;
        if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+               struct task_queue *tq;
+
                init_thread();
-               for (i = 0; i < nr_threads; i++) {
-                       int ret = pthread_create(&thread_data[i].thread, NULL,
-                                                threaded_second_pass, 
thread_data + i);
-                       if (ret)
-                               die(_("unable to create thread: %s"),
-                                   strerror(ret));
-               }
-               for (i = 0; i < nr_threads; i++)
-                       pthread_join(thread_data[i].thread, NULL);
+               tq = create_task_queue(nr_threads);
+
+               for (i = 0; i < nr_objects; i++)
+                       if (!is_delta_type(objects[i].type))
+                               add_task(tq, threaded_second_pass, &objects[i]);
+
+               if (finish_task_queue(tq, cleanup_threaded_second_pass))
+                       die("Not all threads have finished");
+
                cleanup_thread();
                return;
        }
-- 
2.5.0.400.gff86faf

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to