commit: 496ff326dc18890889d1ea5d2aec590394635960 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Mon Aug 10 07:42:51 2015 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Thu Aug 13 19:49:39 2015 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=496ff326
sync repositories in parallel (bug 557426) Repos will now be synced in parallel (including their post-sync hooks), but a given repo will only be synced after its master(s) have synced (in case that matters for hooks). Output of concurrent processes will be mixed (irrelevant with --quiet). Support for FEATURES=metadata-transfer will be handled in the main process, which may be required for some backends (such as sqlite). X-Gentoo-Bug: 557426 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426 Acked-by: Brian Dolbec <dolsen <AT> gentoo.org> pym/portage/emaint/modules/sync/sync.py | 129 ++++++++++++++++++++++++++++-- pym/portage/sync/controller.py | 31 +++++-- pym/portage/tests/sync/test_sync_local.py | 6 +- pym/portage/util/_async/AsyncFunction.py | 67 ++++++++++++++++ 4 files changed, 219 insertions(+), 14 deletions(-) diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py index b463073..879d0f0 100644 --- a/pym/portage/emaint/modules/sync/sync.py +++ b/pym/portage/emaint/modules/sync/sync.py @@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func from portage._global_updates import _global_updates from portage.sync.controller import SyncManager from portage.util import writemsg_level +from portage.util.digraph import digraph +from portage.util._async.AsyncScheduler import AsyncScheduler +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util._eventloop.EventLoop import EventLoop import _emerge from _emerge.emergelog import emergelog @@ -201,6 +205,7 @@ class SyncRepos(object): k = "--" + k.replace("_", "-") self.emerge_config.opts[k] = v + selected_repos = [repo for repo in selected_repos if repo.sync_type is not None] msgs = [] if not selected_repos: msgs.append("Emaint sync, nothing to sync... returning") @@ -213,13 +218,20 @@ class SyncRepos(object): sync_manager = SyncManager( self.emerge_config.target_config.settings, emergelog) - retvals = [] - for repo in selected_repos: - if repo.sync_type is not None: - returncode, message = sync_manager.sync(self.emerge_config, repo) - retvals.append((repo.name, returncode)) - if message: - msgs.append(message) + + max_jobs = (self.emerge_config.opts.get('--jobs', 1) + if 'parallel-fetch' in self.emerge_config. + target_config.settings.features else 1) + sync_scheduler = SyncScheduler(emerge_config=self.emerge_config, + selected_repos=selected_repos, sync_manager=sync_manager, + max_jobs=max_jobs, + event_loop=global_event_loop() if portage._internal_caller else + EventLoop(main=False)) + + sync_scheduler.start() + sync_scheduler.wait() + retvals = sync_scheduler.retvals + msgs.extend(sync_scheduler.msgs) # Reload the whole config. portage._sync_mode = False @@ -287,3 +299,106 @@ class SyncRepos(object): messages.append("Action: %s for repo: %s, returned code = %s" % (action, rval[0], rval[1])) return messages + + +class SyncScheduler(AsyncScheduler): + ''' + Sync repos in parallel, but don't sync a given repo until all + of its masters have synced. + ''' + def __init__(self, **kwargs): + ''' + @param emerge_config: an emerge_config instance + @param selected_repos: list of RepoConfig instances + @param sync_manager: a SyncManger instance + ''' + self._emerge_config = kwargs.pop('emerge_config') + self._selected_repos = kwargs.pop('selected_repos') + self._sync_manager = kwargs.pop('sync_manager') + AsyncScheduler.__init__(self, **kwargs) + self._init_graph() + self.retvals = [] + self.msgs = [] + + def _init_graph(self): + ''' + Graph relationships between repos and their masters. + ''' + self._sync_graph = digraph() + self._leaf_nodes = [] + self._repo_map = {} + self._running_repos = set() + for repo in self._selected_repos: + self._repo_map[repo.name] = repo + self._sync_graph.add(repo.name, None) + for master in repo.masters: + self._repo_map[master.name] = master + self._sync_graph.add(master.name, repo.name) + self._update_leaf_nodes() + + def _task_exit(self, task): + ''' + Remove the task from the graph, in order to expose + more leaf nodes. + ''' + self._running_tasks.discard(task) + returncode = task.returncode + if task.returncode == os.EX_OK: + returncode, message, updatecache_flg = task.result + if message: + self.msgs.append(message) + repo = task.kwargs['repo'].name + self._running_repos.remove(repo) + self.retvals.append((repo, returncode)) + self._sync_graph.remove(repo) + self._update_leaf_nodes() + super(SyncScheduler, self)._task_exit(self) + + def _update_leaf_nodes(self): + ''' + Populate self._leaf_nodes with current leaves from + self._sync_graph. If a circular master relationship + is discovered, choose a random node to break the cycle. + ''' + if self._sync_graph and not self._leaf_nodes: + self._leaf_nodes = [obj for obj in + self._sync_graph.leaf_nodes() + if obj not in self._running_repos] + + if not (self._leaf_nodes or self._running_repos): + # If there is a circular master relationship, + # choose a random node to break the cycle. + self._leaf_nodes = [next(iter(self._sync_graph))] + + def _next_task(self): + ''' + Return a task for the next available leaf node. + ''' + if not self._sync_graph: + raise StopIteration() + # If self._sync_graph is non-empty, then self._leaf_nodes + # is guaranteed to be non-empty, since otherwise + # _can_add_job would have returned False and prevented + # _next_task from being immediately called. + node = self._leaf_nodes.pop() + self._running_repos.add(node) + self._update_leaf_nodes() + + task = self._sync_manager.async( + self._emerge_config, self._repo_map[node]) + return task + + def _can_add_job(self): + ''' + Returns False if there are no leaf nodes available. + ''' + if not AsyncScheduler._can_add_job(self): + return False + return bool(self._leaf_nodes) and not self._terminated.is_set() + + def _keep_scheduling(self): + ''' + Schedule as long as the graph is non-empty, and we haven't + been terminated. + ''' + return bool(self._sync_graph) and not self._terminated.is_set() diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py index 307487f..e992cc4 100644 --- a/pym/portage/sync/controller.py +++ b/pym/portage/sync/controller.py @@ -21,6 +21,7 @@ bad = create_color_func("BAD") warn = create_color_func("WARN") from portage.package.ebuild.doebuild import _check_temp_dir from portage.metadata import action_metadata +from portage.util._async.AsyncFunction import AsyncFunction from portage import OrderedDict from portage import _unicode_decode from portage import util @@ -113,12 +114,18 @@ class SyncManager(object): return desc return [] + def async(self, emerge_config=None, repo=None): + proc = AsyncFunction(target=self.sync, + kwargs=dict(emerge_config=emerge_config, repo=repo)) + proc.addExitListener(self._sync_callback) + return proc - def sync(self, emerge_config=None, repo=None, callback=None): + def sync(self, emerge_config=None, repo=None): self.emerge_config = emerge_config - self.callback = callback or self._sync_callback + self.callback = None self.repo = repo self.exitcode = 1 + self.updatecache_flg = False if repo.sync_type in self.module_names: tasks = [self.module_controller.get_class(repo.sync_type)] else: @@ -149,13 +156,14 @@ class SyncManager(object): self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location) - return self.exitcode, None + return self.exitcode, None, self.updatecache_flg def do_callback(self, result): #print("result:", result, "callback()", self.callback) exitcode, updatecache_flg = result self.exitcode = exitcode + self.updatecache_flg = updatecache_flg if exitcode == 0: msg = "=== Sync completed for %s" % self.repo.name self.logger(self.xterm_titles, msg) @@ -310,17 +318,28 @@ class SyncManager(object): os.umask(0o022) return os.EX_OK + def _sync_callback(self, proc): + """ + This is called in the parent process, serially, for each of the + sync jobs when they complete. Some cache backends such as sqlite + may require that cache access be performed serially in the + parent process like this. + """ + repo = proc.kwargs['repo'] + exitcode = proc.returncode + updatecache_flg = False + if proc.returncode == os.EX_OK: + exitcode, message, updatecache_flg = proc.result - def _sync_callback(self, exitcode, updatecache_flg): if updatecache_flg and "metadata-transfer" not in self.settings.features: updatecache_flg = False if updatecache_flg and \ os.path.exists(os.path.join( - self.repo.location, 'metadata', 'md5-cache')): + repo.location, 'metadata', 'md5-cache')): # Only update cache for repo.location since that's # the only one that's been synced here. action_metadata(self.settings, self.portdb, self.emerge_config.opts, - porttrees=[self.repo.location]) + porttrees=[repo.location]) diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py index f50caba..7753a26 100644 --- a/pym/portage/tests/sync/test_sync_local.py +++ b/pym/portage/tests/sync/test_sync_local.py @@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase): "dev-libs/A-0": {} } + user_config = { + 'make.conf': ('FEATURES="metadata-transfer"',) + } + playground = ResolverPlayground(ebuilds=ebuilds, - profile=profile, user_config={}, debug=debug) + profile=profile, user_config=user_config, debug=debug) settings = playground.settings eprefix = settings["EPREFIX"] eroot = settings["EROOT"] diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py new file mode 100644 index 0000000..b6142a2 --- /dev/null +++ b/pym/portage/util/_async/AsyncFunction.py @@ -0,0 +1,67 @@ +# Copyright 2015 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import pickle +import traceback + +from portage import os +from portage.util._async.ForkProcess import ForkProcess +from _emerge.PipeReader import PipeReader + +class AsyncFunction(ForkProcess): + """ + Execute a function call in a fork, and retrieve the function + return value via pickling/unpickling, accessible as the + "result" attribute after the forked process has exited. + """ + + __slots__ = ('args', 'kwargs', 'result', 'target', + '_async_func_reader', '_async_func_reader_pw') + + def _start(self): + pr, pw = os.pipe() + self.fd_pipes = {} + self.fd_pipes[pw] = pw + self._async_func_reader_pw = pw + self._async_func_reader = PipeReader( + input_files={"input":pr}, + scheduler=self.scheduler) + self._async_func_reader.addExitListener(self._async_func_reader_exit) + self._async_func_reader.start() + ForkProcess._start(self) + os.close(pw) + + def _run(self): + try: + result = self.target(*(self.args or []), **(self.kwargs or {})) + os.write(self._async_func_reader_pw, pickle.dumps(result)) + except Exception: + traceback.print_exc() + return 1 + + return os.EX_OK + + def _pipe_logger_exit(self, pipe_logger): + # Ignore this event, since we want to ensure that we exit + # only after _async_func_reader_exit has reached EOF. + self._pipe_logger = None + + def _async_func_reader_exit(self, pipe_reader): + try: + self.result = pickle.loads(pipe_reader.getvalue()) + except Exception: + # The child process will have printed a traceback in this case, + # and returned an unsuccessful returncode. + pass + self._async_func_reader = None + self._unregister() + self.wait() + + def _unregister(self): + ForkProcess._unregister(self) + + pipe_reader = self._async_func_reader + if pipe_reader is not None: + self._async_func_reader = None + pipe_reader.removeExitListener(self._async_func_reader_exit) + pipe_reader.cancel()