This replaces the last patch of the "Parallel git submodule fetching"
series. Changes:

* have correct return code in submodule fetching when one submodule fails
* use poll instead of select now
* broke down into more smaller functions instead of one giant.
  (I think it is an improvement, but I wouldn't be surprised if someone objects)
* closed memory leaks
* document the need for stdout_to_stderr

I don't deem it RFC-ish any more but good to go.

Any feedback welcome!
Thanks,
Stefan


Stefan Beller (1):
  fetch: fetch submodules in parallel

 Documentation/fetch-options.txt |   7 +
 builtin/fetch.c                 |   6 +-
 builtin/pull.c                  |   6 +
 run-command.c                   | 278 ++++++++++++++++++++++++++++++++++++----
 run-command.h                   |  36 ++++++
 strbuf.c                        |  31 +++++
 strbuf.h                        |   1 +
 submodule.c                     | 119 ++++++++++++-----
 submodule.h                     |   2 +-
 t/t0061-run-command.sh          |  20 +++
 t/t5526-fetch-submodules.sh     |  19 +++
 test-run-command.c              |  24 ++++
 12 files changed, 490 insertions(+), 59 deletions(-)

Interdiff to RFCv1:

diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt
index d432f98..6b109f6 100644
--- a/Documentation/fetch-options.txt
+++ b/Documentation/fetch-options.txt
@@ -105,7 +105,7 @@ ifndef::git-pull[]
        Number of parallel children to be used for fetching submodules.
        Each will fetch from different submodules, such that fetching many
        submodules will be faster. By default submodules will be fetched
-       one at a time
+       one at a time.
 
 --no-recurse-submodules::
        Disable recursive fetching of submodules (this has the same effect as
diff --git a/builtin/fetch.c b/builtin/fetch.c
index a1520bb..f28eac6 100644
--- a/builtin/fetch.c
+++ b/builtin/fetch.c
@@ -101,7 +101,7 @@ static struct option builtin_fetch_options[] = {
        OPT_SET_INT('n', NULL, &tags,
                    N_("do not fetch all tags (--no-tags)"), TAGS_UNSET),
        OPT_INTEGER('j', "jobs", &max_children,
-                   N_("number of threads used for fetching")),
+                   N_("number of submodules fetched in parallel")),
        OPT_BOOL('p', "prune", &prune,
                 N_("prune remote-tracking branches no longer on remote")),
        { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"),
diff --git a/builtin/pull.c b/builtin/pull.c
index bc117e9..f0af196 100644
--- a/builtin/pull.c
+++ b/builtin/pull.c
@@ -179,7 +179,7 @@ static struct option pull_options[] = {
                N_("control recursive fetching of submodules"),
                PARSE_OPT_OPTARG),
        OPT_PASSTHRU('j', "jobs", &max_children, N_("n"),
-               N_("number of threads used for fetching submodules"),
+               N_("number of submodules pulled in parallel"),
                PARSE_OPT_OPTARG),
        OPT_BOOL(0, "dry-run", &opt_dry_run,
                N_("dry run")),
diff --git a/run-command.c b/run-command.c
index b8ff67b..6f6f9fb 100644
--- a/run-command.c
+++ b/run-command.c
@@ -232,6 +232,35 @@ static inline void set_cloexec(int fd)
                fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
+static int determine_return_value(int wait_status,
+                                 int *result,
+                                 int *error_code,
+                                 const char *argv0)
+{
+       if (WIFSIGNALED(wait_status)) {
+               *result = WTERMSIG(wait_status);
+               if (*result != SIGINT && *result != SIGQUIT)
+                       error("%s died of signal %d", argv0, *result);
+               /*
+                * This return value is chosen so that code & 0xff
+                * mimics the exit code that a POSIX shell would report for
+                * a program that died from this signal.
+                */
+               *result += 128;
+       } else if (WIFEXITED(wait_status)) {
+               *result = WEXITSTATUS(wait_status);
+               /*
+                * Convert special exit code when execvp failed.
+                */
+               if (*result == 127) {
+                       *result = -1;
+                       *error_code = ENOENT;
+               }
+       } else
+               return 1;
+       return 0;
+}
+
 static int wait_or_whine(pid_t pid, const char *argv0)
 {
        int status, code = -1;
@@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
        if (waiting < 0) {
                failed_errno = errno;
                error("waitpid for %s failed: %s", argv0, strerror(errno));
-       } else if (waiting != pid) {
-               error("waitpid is confused (%s)", argv0);
-       } else if (WIFSIGNALED(status)) {
-               code = WTERMSIG(status);
-               if (code != SIGINT && code != SIGQUIT)
-                       error("%s died of signal %d", argv0, code);
-               /*
-                * This return value is chosen so that code & 0xff
-                * mimics the exit code that a POSIX shell would report for
-                * a program that died from this signal.
-                */
-               code += 128;
-       } else if (WIFEXITED(status)) {
-               code = WEXITSTATUS(status);
-               /*
-                * Convert special exit code when execvp failed.
-                */
-               if (code == 127) {
-                       code = -1;
-                       failed_errno = ENOENT;
-               }
        } else {
-               error("waitpid is confused (%s)", argv0);
+               if (waiting != pid
+                  || (determine_return_value(status, &code, &failed_errno, 
argv0) < 0))
+                       error("waitpid is confused (%s)", argv0);
        }
 
        clear_child_for_cleanup(pid);
@@ -853,146 +863,226 @@ int capture_command(struct child_process *cmd, struct 
strbuf *buf, size_t hint)
        return finish_command(cmd);
 }
 
-int run_processes_async(int n, get_next_task fn, void *data)
+static void unblock_fd(int fd)
 {
-       int i, wait_status;
-       pid_t pid;
+       int flags = fcntl(fd, F_GETFL);
+       if (flags < 0) {
+               warning("Could not get file status flags, "
+                       "output will be degraded");
+               return;
+       }
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) {
+                       warning("Could not set file status flags, "
+                       "output will be degraded");
+               return;
+       }
+}
 
-       /* no more tasks. Also set when aborting early. */
-       int all_tasks_started = 0;
-       int nr_processes = 0;
-       int child_in_foreground = 0;
-       struct timeval timeout;
-       struct child_process *children = xcalloc(n, sizeof(*children));
-       char *slots = xcalloc(n, sizeof(*slots));
-       struct strbuf *err = xcalloc(n, sizeof(*err));
-       fd_set fdset;
-       int maxfd;
-       struct strbuf finished_children = STRBUF_INIT;
-       int flags;
-       for (i = 0; i < n; i++)
-               strbuf_init(&err[i], 0);
-
-       while (!all_tasks_started || nr_processes > 0) {
-               /* Start new processes. */
-               while (!all_tasks_started && nr_processes < n) {
-                       for (i = 0; i < n; i++)
-                               if (!slots[i])
-                                       break; /* found an empty slot */
-                       if (i == n)
-                               die("BUG: bookkeeping is hard");
-
-                       if (fn(data, &children[i], &err[i])) {
-                               all_tasks_started = 1;
-                               break;
-                       }
-                       if (start_command(&children[i]))
-                               die(_("Could not start child process"));
-                       flags = fcntl(children[i].err, F_GETFL);
-                       fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK);
-                       nr_processes++;
-                       slots[i] = 1;
-               }
+struct parallel_processes {
+       int max_number_processes;
+       void *data;
+       get_next_task fn;
+       handle_child_starting_failure fn_err;
+       handle_child_return_value fn_exit;
+
+       int nr_processes;
+       int all_tasks_started;
+       int foreground_child;
+       char *slots;
+       struct child_process *children;
+       struct pollfd *pfd;
+       struct strbuf *err;
+       struct strbuf finished_children;
+};
+
+static void run_processes_parallel_init(struct parallel_processes *pp,
+                                       int n, void *data,
+                                       get_next_task fn,
+                                       handle_child_starting_failure fn_err,
+                                       handle_child_return_value fn_exit)
+{
+       int i;
+
+       pp->max_number_processes = n;
+       pp->data = data;
+       pp->fn = fn;
+       pp->fn_err = fn_err;
+       pp->fn_exit = fn_exit;
+
+       pp->nr_processes = 0;
+       pp->all_tasks_started = 0;
+       pp->foreground_child = 0;
+       pp->slots = xcalloc(n, sizeof(*pp->slots));
+       pp->children = xcalloc(n, sizeof(*pp->children));
+       pp->pfd = xcalloc(n, sizeof(*pp->pfd));
+       pp->err = xcalloc(n, sizeof(*pp->err));
+       strbuf_init(&pp->finished_children, 0);
+
+       for (i = 0; i < n; i++) {
+               strbuf_init(&pp->err[i], 0);
+               pp->pfd[i].events = POLLIN;
+               pp->pfd[i].fd = -1;
+       }
+}
+
+static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+{
+       int i;
+       for (i = 0; i < pp->max_number_processes; i++)
+               strbuf_release(&pp->err[i]);
+
+       free(pp->children);
+       free(pp->slots);
+       free(pp->pfd);
+       free(pp->err);
+       strbuf_release(&pp->finished_children);
+}
 
-               /* prepare data for select call */
-               FD_ZERO(&fdset);
-               maxfd = 0;
-               for (i = 0; i < n; i++) {
-                       if (!slots[i])
-                               continue;
-                       FD_SET(children[i].err, &fdset);
-                       if (children[i].err > maxfd)
-                               maxfd = children[i].err;
+static void run_processes_parallel_start_new(struct parallel_processes *pp)
+{
+       int i;
+       /* Start new processes. */
+       while (!pp->all_tasks_started
+              && pp->nr_processes < pp->max_number_processes) {
+               for (i = 0; i < pp->max_number_processes; i++)
+                       if (!pp->slots[i])
+                               break; /* found an empty slot */
+               if (i == pp->max_number_processes)
+                       die("BUG: bookkeeping is hard");
+
+               if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) {
+                       pp->all_tasks_started = 1;
+                       break;
                }
-               timeout.tv_sec = 0;
-               timeout.tv_usec = 500000;
+               if (start_command(&pp->children[i]))
+                       pp->fn_err(pp->data, &pp->children[i], &pp->err[i]);
 
-               i = select(maxfd + 1, &fdset, NULL, NULL, &timeout);
-               if (i < 0) {
-                       if (errno == EINTR)
-                               /* A signal was caught; try again */
-                               continue;
-                       else if (errno == ENOMEM)
-                               die_errno("BUG: keeping track of fds is hard");
-                       else if (errno == EINVAL)
-                               die_errno("BUG: invalid arguments to select");
-                       else if (errno == EBADF)
-                               die_errno("BUG: keeping track of fds is hard");
-                       else
-                               die_errno("Unknown error with select");
+               unblock_fd(pp->children[i].err);
+
+               pp->nr_processes++;
+               pp->slots[i] = 1;
+               pp->pfd[i].fd = pp->children[i].err;
+       }
+}
+
+static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+{
+       int i;
+       i = poll(pp->pfd, pp->max_number_processes, 100);
+       if (i < 0) {
+               if (errno == EINTR)
+                       /* A signal was caught; try again */
+                       return -1;
+               else {
+                       run_processes_parallel_cleanup(pp);
+                       die_errno("poll");
                }
+       }
 
-               /* Buffer output from all pipes. */
-               for (i = 0; i < n; i++) {
-                       if (!slots[i])
-                               continue;
-                       if (FD_ISSET(children[i].err, &fdset))
-                               strbuf_read_noblock(&err[i], children[i].err, 
0);
-                       if (child_in_foreground == i) {
-                               fputs(err[i].buf, stderr);
-                               strbuf_reset(&err[i]);
-                               fflush(stderr);
-                       }
+       /* Buffer output from all pipes. */
+       for (i = 0; i < pp->max_number_processes; i++) {
+               if (!pp->slots[i])
+                       continue;
+               if (pp->pfd[i].revents & POLLIN)
+                       strbuf_read_noblock(&pp->err[i], pp->children[i].err, 
0);
+               if (pp->foreground_child == i) {
+                       fputs(pp->err[i].buf, stderr);
+                       strbuf_reset(&pp->err[i]);
                }
+       }
+       return 0;
+}
 
-               /* Collect finished child processes. */
-               while (nr_processes > 0) {
-                       pid = waitpid(-1, &wait_status, WNOHANG);
-                       if (pid == 0)
-                               /* no child finished */
-                               break;
-
-                       if (pid < 0) {
-                               if (errno == EINTR)
-                                       break; /* just try again  next time */
-                               if (errno == EINVAL || errno == ECHILD)
-                                       die_errno("wait");
-                       } else {
-                               /* Find the finished child. */
-                               for (i = 0; i < n; i++)
-                                       if (slots[i] && pid == children[i].pid)
-                                               break;
-                               if (i == n)
-                                       /* waitpid returned another process id 
which
-                                        * we are not waiting on, so ignore it*/
+
+static void run_processes_parallel_collect_finished(struct parallel_processes 
*pp)
+{
+       int i = 0;
+       pid_t pid;
+       int wait_status, code;
+       int n = pp->max_number_processes;
+       /* Collect finished child processes. */
+       while (pp->nr_processes > 0) {
+               pid = waitpid(-1, &wait_status, WNOHANG);
+               if (pid == 0)
+                       return; /* no child finished */
+
+               if (pid < 0) {
+                       if (errno == EINTR)
+                               return; /* just try again  next time */
+                       if (errno == EINVAL || errno == ECHILD)
+                               die_errno("wait");
+               } else {
+                       /* Find the finished child. */
+                       for (i = 0; i < pp->max_number_processes; i++)
+                               if (pp->slots[i] && pid == pp->children[i].pid)
                                        break;
-                       }
+                       if (i == pp->max_number_processes)
+                               /*
+                                * waitpid returned another process id
+                                * which we are not waiting for.
+                                */
+                               return;
+               }
+               strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0);
 
-                       strbuf_read_noblock(&err[i], children[i].err, 0);
-                       argv_array_clear(&children[i].args);
-                       argv_array_clear(&children[i].env_array);
+               if (determine_return_value(wait_status, &code, &errno,
+                                          pp->children[i].argv[0]) < 0)
+                       error("waitpid is confused (%s)",
+                             pp->children[i].argv[0]);
 
-                       slots[i] = 0;
-                       nr_processes--;
+               pp->fn_exit(pp->data, &pp->children[i], code);
 
-                       if (i != child_in_foreground) {
-                               strbuf_addbuf(&finished_children, &err[i]);
-                               strbuf_reset(&err[i]);
-                       } else {
-                               fputs(err[i].buf, stderr);
-                               strbuf_reset(&err[i]);
+               argv_array_clear(&pp->children[i].args);
+               argv_array_clear(&pp->children[i].env_array);
 
-                               /* Output all other finished child processes */
-                               fputs(finished_children.buf, stderr);
-                               strbuf_reset(&finished_children);
+               pp->nr_processes--;
+               pp->slots[i] = 0;
+               pp->pfd[i].fd = -1;
 
-                               /*
-                                * Pick next process to output live.
-                                * There can be no active process if n==1
-                                * NEEDSWORK:
-                                * For now we pick it randomly by doing a round
-                                * robin. Later we may want to pick the one with
-                                * the most output or the longest or shortest
-                                * running process time.
-                                */
-                               for (i = 0; i < n; i++)
-                                       if (slots[(child_in_foreground + i) % 
n])
-                                               break;
-                               child_in_foreground = (child_in_foreground + i) 
% n;
-                               fputs(err[child_in_foreground].buf, stderr);
-                               strbuf_reset(&err[child_in_foreground]);
-                       }
+               if (i != pp->foreground_child) {
+                       strbuf_addbuf(&pp->finished_children, &pp->err[i]);
+                       strbuf_reset(&pp->err[i]);
+               } else {
+                       fputs(pp->err[i].buf, stderr);
+                       strbuf_reset(&pp->err[i]);
+
+                       /* Output all other finished child processes */
+                       fputs(pp->finished_children.buf, stderr);
+                       strbuf_reset(&pp->finished_children);
+
+                       /*
+                        * Pick next process to output live.
+                        * NEEDSWORK:
+                        * For now we pick it randomly by doing a round
+                        * robin. Later we may want to pick the one with
+                        * the most output or the longest or shortest
+                        * running process time.
+                        */
+                       for (i = 0; i < n; i++)
+                               if (pp->slots[(pp->foreground_child + i) % n])
+                                       break;
+                       pp->foreground_child = (pp->foreground_child + i) % n;
+                       fputs(pp->err[pp->foreground_child].buf, stderr);
+                       strbuf_reset(&pp->err[pp->foreground_child]);
                }
        }
+}
+
+int run_processes_parallel(int n, void *data,
+                          get_next_task fn,
+                          handle_child_starting_failure fn_err,
+                          handle_child_return_value fn_exit)
+{
+       struct parallel_processes pp;
+       run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit);
+
+       while (!pp.all_tasks_started || pp.nr_processes > 0) {
+               run_processes_parallel_start_new(&pp);
+               if (run_processes_parallel_buffer_stderr(&pp))
+                       continue;
+               run_processes_parallel_collect_finished(&pp);
+       }
+       run_processes_parallel_cleanup(&pp);
+
        return 0;
 }
diff --git a/run-command.h b/run-command.h
index 8f53ad6..0487f71 100644
--- a/run-command.h
+++ b/run-command.h
@@ -120,32 +120,39 @@ int start_async(struct async *async);
 int finish_async(struct async *async);
 
 /**
- * Return 0 if the next child is ready to run.
- * This callback takes care to initialize the child process and preload the
- * out and error channel. The preloading of these outpout channels is useful
- * if you want to have a message printed directly before the output of the
- * child process.
+ * This callback should initialize the child process and preload the
+ * error channel. The preloading of is useful if you want to have a message
+ * printed directly before the output of the child process.
+ * You MUST set stdout_to_stderr.
  *
+ * Return 0 if the next child is ready to run.
  * Return != 0 if there are no more tasks to be processed.
  */
 typedef int (*get_next_task)(void *data,
                             struct child_process *cp,
                             struct strbuf *err);
 
+typedef void (*handle_child_starting_failure)(void *data,
+                                             struct child_process *cp,
+                                             struct strbuf *err);
+
+typedef void (*handle_child_return_value)(void *data,
+                                         struct child_process *cp,
+                                         int result);
+
 /**
- * Runs up to n processes at the same time. Whenever a process can
- * be started, the callback `get_next_task` is called to obtain the
- * data fed to the child process.
+ * Runs up to n processes at the same time. Whenever a process can be
+ * started, the callback `get_next_task` is called to obtain the data
+ * fed to the child process.
  *
  * The children started via this function run in parallel and their output
- * to both stdout and stderr is buffered, while one of the children will
- * directly output to stdout/stderr.
- *
- * This leads to a problem with output from processes which put out to
- * stdout/err alternatingly as the buffering will not be able to replay
- * the
+ * to stderr is buffered, while one of the children will directly output
+ * to stderr.
  */
 
-int run_processes_async(int n, get_next_task fn, void *data);
+int run_processes_parallel(int n, void *data,
+                          get_next_task fn,
+                          handle_child_starting_failure,
+                          handle_child_return_value);
 
 #endif
diff --git a/submodule.c b/submodule.c
index 6d757c6..a0e06e8 100644
--- a/submodule.c
+++ b/submodule.c
@@ -623,17 +623,32 @@ struct submodule_parallel_fetch {
        const char *prefix;
        int command_line_option;
        int quiet;
+       int result;
 };
-#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL}
+#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0}
 
 int get_next_submodule(void *data, struct child_process *cp,
                       struct strbuf *err);
 
+void handle_submodule_fetch_start_err(void *data, struct child_process *cp, 
struct strbuf *err)
+{
+       struct submodule_parallel_fetch *spf = data;
+       spf->result = 1;
+}
+
+void handle_submodule_fetch_finish( void *data, struct child_process *cp, int 
retvalue)
+{
+       struct submodule_parallel_fetch *spf = data;
+
+       if (retvalue)
+               spf->result = 1;
+}
+
 int fetch_populated_submodules(const struct argv_array *options,
                               const char *prefix, int command_line_option,
                               int quiet, int max_parallel_jobs)
 {
-       int i, result = 0;
+       int i;
        struct submodule_parallel_fetch spf = SPF_INIT;
        spf.work_tree = get_git_work_tree();
        spf.command_line_option = command_line_option;
@@ -652,12 +667,15 @@ int fetch_populated_submodules(const struct argv_array 
*options,
        /* default value, "--submodule-prefix" and its value are added later */
 
        calculate_changed_submodule_paths();
-       run_processes_async(max_parallel_jobs, get_next_submodule, &spf);
+       run_processes_parallel(max_parallel_jobs, &spf,
+                              get_next_submodule,
+                              handle_submodule_fetch_start_err,
+                              handle_submodule_fetch_finish);
 
        argv_array_clear(&spf.args);
 out:
        string_list_clear(&changed_submodule_paths, 1);
-       return result;
+       return spf.result;
 }
 
 int get_next_submodule(void *data, struct child_process *cp,
diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh
index 0970fb0..37c89b9 100755
--- a/t/t0061-run-command.sh
+++ b/t/t0061-run-command.sh
@@ -48,18 +48,22 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory 
in PATH' '
 '
 
 cat >expect <<-EOF
-Now running instance 0
-Hello World
-Now running instance 1
-Hello World
-Now running instance 2
-Hello World
-Now running instance 3
-Hello World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
+preloaded output of a child
+Hello
+World
 EOF
 
 test_expect_success 'run_command runs in parallel' '
-       test-run-command run-command-async sh -c "echo Hello World >&2;" 
2>actual &&
+       test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello 
World" 2>actual &&
        test_cmp expect actual
 '
 
diff --git a/test-run-command.c b/test-run-command.c
index 4817f6e..71fd3ca 100644
--- a/test-run-command.c
+++ b/test-run-command.c
@@ -16,9 +16,9 @@
 #include <errno.h>
 
 static int number_callbacks;
-int run_processes_async_next(void *data,
-                            struct child_process *cp,
-                            struct strbuf *err)
+int parallel_next(void *data,
+                 struct child_process *cp,
+                 struct strbuf *err)
 {
        struct child_process *d = data;
        if (number_callbacks >= 4)
@@ -28,7 +28,7 @@ int run_processes_async_next(void *data,
        cp->stdout_to_stderr = 1;
        cp->no_stdin = 1;
        cp->err = -1;
-       strbuf_addf(err, "Now running instance %d\n", number_callbacks);
+       strbuf_addf(err, "preloaded output of a child\n");
        number_callbacks++;
        return 0;
 }
@@ -51,7 +51,8 @@ int main(int argc, char **argv)
                exit(run_command(&proc));
 
        if (!strcmp(argv[1], "run-command-async"))
-               exit(run_processes_async(4, run_processes_async_next, &proc));
+               exit(run_processes_parallel(4, &proc, parallel_next,
+                                        NULL, NULL));
 
        fprintf(stderr, "check usage\n");
        return 1;

-- 
2.6.0.rc0.131.gf624c3d

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to