commit: dc7919541712d846574e6b7d672a3bed0ca7ef1a Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Aug 18 06:31:54 2020 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Wed Aug 19 04:01:46 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=dc791954
coroutine: use explicit loop parameter (bug 737698) In order to support local event loops within API functions like doebuild, use an explicit loop parameter when calling a coroutine. Internal code will now raise an AssertionError if the loop parameter is omitted for a coroutine, but API consumers may omit it. Bug: https://bugs.gentoo.org/737698 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/_emerge/Binpkg.py | 8 ++-- lib/_emerge/EbuildPhase.py | 16 ++++---- lib/_emerge/Scheduler.py | 4 +- lib/_emerge/SequentialTaskQueue.py | 4 +- lib/_emerge/SpawnProcess.py | 5 ++- lib/portage/dbapi/bintree.py | 12 +++--- lib/portage/dbapi/vartree.py | 8 ++-- .../repository/storage/hardlink_quarantine.py | 26 ++++++------- lib/portage/repository/storage/hardlink_rcu.py | 34 ++++++++-------- lib/portage/repository/storage/inplace.py | 10 ++--- lib/portage/repository/storage/interface.py | 10 ++--- lib/portage/sync/syncbase.py | 2 +- lib/portage/tests/dbapi/test_auxdb.py | 9 +++-- lib/portage/tests/emerge/test_simple.py | 6 +-- lib/portage/tests/process/test_AsyncFunction.py | 4 +- lib/portage/tests/process/test_PipeLogger.py | 2 +- .../util/futures/asyncio/test_child_watcher.py | 4 +- .../tests/util/futures/test_compat_coroutine.py | 45 ++++++++++++---------- lib/portage/tests/util/test_socks5.py | 2 +- lib/portage/util/_async/BuildLogger.py | 4 +- lib/portage/util/_async/ForkProcess.py | 6 +-- lib/portage/util/_async/PipeLogger.py | 4 +- lib/portage/util/_async/SchedulerInterface.py | 4 +- lib/portage/util/futures/_asyncio/process.py | 16 ++++---- lib/portage/util/futures/_sync_decorator.py | 3 +- lib/portage/util/futures/compat_coroutine.py | 7 +++- lib/portage/util/socks5.py | 4 +- repoman/lib/repoman/modules/scan/depend/profile.py | 4 +- 28 files changed, 138 insertions(+), 125 deletions(-) diff --git a/lib/_emerge/Binpkg.py b/lib/_emerge/Binpkg.py index b5a69f8e7..9d2909d42 100644 --- a/lib/_emerge/Binpkg.py +++ b/lib/_emerge/Binpkg.py @@ -250,11 +250,11 @@ class Binpkg(CompositeTask): return self._start_task( - AsyncTaskFuture(future=self._unpack_metadata()), + AsyncTaskFuture(future=self._unpack_metadata(loop=self.scheduler)), self._unpack_metadata_exit) @coroutine - def _unpack_metadata(self): + def _unpack_metadata(self, loop=None): dir_path = self.settings['PORTAGE_BUILDDIR'] @@ -271,7 +271,7 @@ class Binpkg(CompositeTask): portage.prepare_build_dirs(self.settings["ROOT"], self.settings, 1) self._writemsg_level(">>> Extracting info\n") - yield self._bintree.dbapi.unpack_metadata(self.settings, infloc) + yield self._bintree.dbapi.unpack_metadata(self.settings, infloc, loop=self.scheduler) check_missing_metadata = ("CATEGORY", "PF") for k, v in zip(check_missing_metadata, self._bintree.dbapi.aux_get(self.pkg.cpv, check_missing_metadata)): @@ -333,7 +333,7 @@ class Binpkg(CompositeTask): self._start_task( AsyncTaskFuture(future=self._bintree.dbapi.unpack_contents( self.settings, - self._image_dir)), + self._image_dir, loop=self.scheduler)), self._unpack_contents_exit) def _unpack_contents_exit(self, unpack_contents): diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py index 4bc2749bd..e4c0428a6 100644 --- a/lib/_emerge/EbuildPhase.py +++ b/lib/_emerge/EbuildPhase.py @@ -70,11 +70,11 @@ class EbuildPhase(CompositeTask): _locked_phases = ("setup", "preinst", "postinst", "prerm", "postrm") def _start(self): - future = asyncio.ensure_future(self._async_start(), loop=self.scheduler) + future = asyncio.ensure_future(self._async_start(loop=self.scheduler), loop=self.scheduler) self._start_task(AsyncTaskFuture(future=future), self._async_start_exit) @coroutine - def _async_start(self): + def _async_start(self, loop=None): need_builddir = self.phase not in EbuildProcess._phases_without_builddir @@ -132,7 +132,7 @@ class EbuildPhase(CompositeTask): # Force background=True for this header since it's intended # for the log and it doesn't necessarily need to be visible # elsewhere. - yield self._elog('einfo', msg, background=True) + yield self._elog('einfo', msg, background=True, loop=self.scheduler) if self.phase == 'package': if 'PORTAGE_BINPKG_TMPFILE' not in self.settings: @@ -403,7 +403,7 @@ class EbuildPhase(CompositeTask): self.wait() @coroutine - def _elog(self, elog_funcname, lines, background=None): + def _elog(self, elog_funcname, lines, background=None, loop=None): if background is None: background = self.background out = io.StringIO() @@ -435,7 +435,7 @@ class EbuildPhase(CompositeTask): log_file = build_logger.stdin yield self.scheduler.async_output(msg, log_file=log_file, - background=background) + background=background, loop=self.scheduler) if build_logger is not None: build_logger.stdin.close() @@ -487,7 +487,7 @@ class _PostPhaseCommands(CompositeTask): if 'qa-unresolved-soname-deps' in self.settings.features: # This operates on REQUIRES metadata generated by the above function call. - future = self._soname_deps_qa() + future = asyncio.ensure_future(self._soname_deps_qa(loop=self.scheduler), loop=self.scheduler) # If an unexpected exception occurs, then this will raise it. future.add_done_callback(lambda future: future.cancelled() or future.result()) self._start_task(AsyncTaskFuture(future=future), self._default_final_exit) @@ -497,7 +497,7 @@ class _PostPhaseCommands(CompositeTask): self._default_final_exit(task) @coroutine - def _soname_deps_qa(self): + def _soname_deps_qa(self, loop=None): vardb = QueryCommand.get_db()[self.settings['EROOT']]['vartree'].dbapi @@ -512,4 +512,4 @@ class _PostPhaseCommands(CompositeTask): qa_msg.extend("\t%s: %s" % (filename, " ".join(sorted(soname_deps))) for filename, soname_deps in unresolved) qa_msg.append("") - yield self.elog("eqawarn", qa_msg) + yield self.elog("eqawarn", qa_msg, loop=self.scheduler) diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py index 2427d953c..a69421288 100644 --- a/lib/_emerge/Scheduler.py +++ b/lib/_emerge/Scheduler.py @@ -871,7 +871,7 @@ class Scheduler(PollScheduler): infloc = os.path.join(build_dir_path, "build-info") ensure_dirs(infloc) self._sched_iface.run_until_complete( - bintree.dbapi.unpack_metadata(settings, infloc)) + bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface)) ebuild_path = os.path.join(infloc, x.pf + ".ebuild") settings.configdict["pkg"]["EMERGE_FROM"] = "binary" settings.configdict["pkg"]["MERGE_TYPE"] = "binary" @@ -1621,7 +1621,7 @@ class Scheduler(PollScheduler): if (self._task_queues.merge and (self._schedule_merge_wakeup_task is None or self._schedule_merge_wakeup_task.done())): self._schedule_merge_wakeup_task = asyncio.ensure_future( - self._task_queues.merge.wait(), loop=self._event_loop) + self._task_queues.merge.wait(loop=self._event_loop), loop=self._event_loop) self._schedule_merge_wakeup_task.add_done_callback( self._schedule_merge_wakeup) diff --git a/lib/_emerge/SequentialTaskQueue.py b/lib/_emerge/SequentialTaskQueue.py index 40590b76c..02fe19912 100644 --- a/lib/_emerge/SequentialTaskQueue.py +++ b/lib/_emerge/SequentialTaskQueue.py @@ -69,7 +69,7 @@ class SequentialTaskQueue(SlotObject): task.cancel() @coroutine - def wait(self): + def wait(self, loop=None): """ Wait for the queue to become empty. This method is a coroutine. """ @@ -77,7 +77,7 @@ class SequentialTaskQueue(SlotObject): task = next(iter(self.running_tasks), None) if task is None: # Wait for self.running_tasks to populate. - yield asyncio.sleep(0) + yield asyncio.sleep(0, loop=loop) else: yield task.async_wait() diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index cea16df27..c43d60d3f 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -140,11 +140,12 @@ class SpawnProcess(SubProcess): self._registered = True self._main_task_cancel = functools.partial(self._main_cancel, build_logger, pipe_logger) - self._main_task = asyncio.ensure_future(self._main(build_logger, pipe_logger), loop=self.scheduler) + self._main_task = asyncio.ensure_future( + self._main(build_logger, pipe_logger, loop=self.scheduler), loop=self.scheduler) self._main_task.add_done_callback(self._main_exit) @coroutine - def _main(self, build_logger, pipe_logger): + def _main(self, build_logger, pipe_logger, loop=None): try: if pipe_logger.poll() is None: yield pipe_logger.async_wait() diff --git a/lib/portage/dbapi/bintree.py b/lib/portage/dbapi/bintree.py index 59c265688..620865a79 100644 --- a/lib/portage/dbapi/bintree.py +++ b/lib/portage/dbapi/bintree.py @@ -217,7 +217,7 @@ class bindbapi(fakedbapi): @coroutine - def unpack_metadata(self, pkg, dest_dir): + def unpack_metadata(self, pkg, dest_dir, loop=None): """ Unpack package metadata to a directory. This method is a coroutine. @@ -226,7 +226,7 @@ class bindbapi(fakedbapi): @param dest_dir: destination directory @type dest_dir: str """ - loop = asyncio._wrap_loop() + loop = asyncio._wrap_loop(loop) if isinstance(pkg, _pkg_str): cpv = pkg else: @@ -234,14 +234,14 @@ class bindbapi(fakedbapi): key = self._instance_key(cpv) add_pkg = self.bintree._additional_pkgs.get(key) if add_pkg is not None: - yield add_pkg._db.unpack_metadata(pkg, dest_dir) + yield add_pkg._db.unpack_metadata(pkg, dest_dir, loop=loop) else: tbz2_file = self.bintree.getname(cpv) yield loop.run_in_executor(ForkExecutor(loop=loop), portage.xpak.tbz2(tbz2_file).unpackinfo, dest_dir) @coroutine - def unpack_contents(self, pkg, dest_dir): + def unpack_contents(self, pkg, dest_dir, loop=None): """ Unpack package contents to a directory. This method is a coroutine. @@ -250,7 +250,7 @@ class bindbapi(fakedbapi): @param dest_dir: destination directory @type dest_dir: str """ - loop = asyncio._wrap_loop() + loop = asyncio._wrap_loop(loop) if isinstance(pkg, _pkg_str): settings = self.settings cpv = pkg @@ -280,7 +280,7 @@ class bindbapi(fakedbapi): add_pkg = self.bintree._additional_pkgs.get(instance_key) if add_pkg is None: raise portage.exception.PackageNotFound(cpv) - yield add_pkg._db.unpack_contents(pkg, dest_dir) + yield add_pkg._db.unpack_contents(pkg, dest_dir, loop=loop) def cp_list(self, *pargs, **kwargs): if not self.bintree.populated: diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py index 3eee025ad..1547d2f6d 100644 --- a/lib/portage/dbapi/vartree.py +++ b/lib/portage/dbapi/vartree.py @@ -931,7 +931,7 @@ class vardbapi(dbapi): self._bump_mtime(cpv) @coroutine - def unpack_metadata(self, pkg, dest_dir): + def unpack_metadata(self, pkg, dest_dir, loop=None): """ Unpack package metadata to a directory. This method is a coroutine. @@ -940,7 +940,7 @@ class vardbapi(dbapi): @param dest_dir: destination directory @type dest_dir: str """ - loop = asyncio._wrap_loop() + loop = asyncio._wrap_loop(loop) if not isinstance(pkg, portage.config): cpv = pkg else: @@ -956,7 +956,7 @@ class vardbapi(dbapi): @coroutine def unpack_contents(self, pkg, dest_dir, - include_config=None, include_unmodified_config=None): + include_config=None, include_unmodified_config=None, loop=None): """ Unpack package contents to a directory. This method is a coroutine. @@ -982,7 +982,7 @@ class vardbapi(dbapi): by QUICKPKG_DEFAULT_OPTS). @type include_unmodified_config: bool """ - loop = asyncio._wrap_loop() + loop = asyncio._wrap_loop(loop) if not isinstance(pkg, portage.config): settings = self.settings cpv = pkg diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py index 3594cb1c9..165ab8324 100644 --- a/lib/portage/repository/storage/hardlink_quarantine.py +++ b/lib/portage/repository/storage/hardlink_quarantine.py @@ -39,59 +39,59 @@ class HardlinkQuarantineRepoStorage(RepoStorageInterface): self._current_update = None @coroutine - def _check_call(self, cmd): + def _check_call(self, cmd, loop=None): """ Run cmd and raise RepoStorageException on failure. @param cmd: command to executre @type cmd: list """ - p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs) + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **self._spawn_kwargs) p.start() if (yield p.async_wait()) != os.EX_OK: raise RepoStorageException('command exited with status {}: {}'.\ format(p.returncode, ' '.join(cmd))) @coroutine - def init_update(self): + def init_update(self, loop=None): update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine') - yield self._check_call(['rm', '-rf', update_location]) + yield self._check_call(['rm', '-rf', update_location], loop=loop) # Use rsync --link-dest to hardlink a files into self._update_location, # since cp -l is not portable. yield self._check_call(['rsync', '-a', '--link-dest', self._user_location, '--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages', '--exclude', '/{}'.format(os.path.basename(update_location)), - self._user_location + '/', update_location + '/']) + self._user_location + '/', update_location + '/'], loop=loop) self._update_location = update_location coroutine_return(self._update_location) @property - def current_update(self): + def current_update(self, loop=None): if self._update_location is None: raise RepoStorageException('current update does not exist') return self._update_location @coroutine - def commit_update(self): + def commit_update(self, loop=None): update_location = self.current_update self._update_location = None yield self._check_call(['rsync', '-a', '--delete', '--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages', '--exclude', '/{}'.format(os.path.basename(update_location)), - update_location + '/', self._user_location + '/']) + update_location + '/', self._user_location + '/'], loop=loop) - yield self._check_call(['rm', '-rf', update_location]) + yield self._check_call(['rm', '-rf', update_location], loop=loop) @coroutine - def abort_update(self): + def abort_update(self, loop=None): if self._update_location is not None: update_location = self._update_location self._update_location = None - yield self._check_call(['rm', '-rf', update_location]) + yield self._check_call(['rm', '-rf', update_location], loop=loop) @coroutine - def garbage_collection(self): - yield self.abort_update() + def garbage_collection(self, loop=None): + yield self.abort_update(loop=loop) diff --git a/lib/portage/repository/storage/hardlink_rcu.py b/lib/portage/repository/storage/hardlink_rcu.py index bb2c8496b..68081494c 100644 --- a/lib/portage/repository/storage/hardlink_rcu.py +++ b/lib/portage/repository/storage/hardlink_rcu.py @@ -105,7 +105,7 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): self._snapshots_dir = os.path.join(self._storage_location, 'snapshots') @coroutine - def _check_call(self, cmd, privileged=False): + def _check_call(self, cmd, privileged=False, loop=None): """ Run cmd and raise RepoStorageException on failure. @@ -118,16 +118,16 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): kwargs = dict(fd_pipes=self._spawn_kwargs.get('fd_pipes')) else: kwargs = self._spawn_kwargs - p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **kwargs) + p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **kwargs) p.start() if (yield p.async_wait()) != os.EX_OK: raise RepoStorageException('command exited with status {}: {}'.\ format(p.returncode, ' '.join(cmd))) @coroutine - def init_update(self): + def init_update(self, loop=None): update_location = os.path.join(self._storage_location, 'update') - yield self._check_call(['rm', '-rf', update_location]) + yield self._check_call(['rm', '-rf', update_location], loop=loop) # This assumes normal umask permissions if it doesn't exist yet. portage.util.ensure_dirs(self._storage_location) @@ -139,18 +139,18 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): # Use rsync --link-dest to hardlink a files into update_location, # since cp -l is not portable. yield self._check_call(['rsync', '-a', '--link-dest', self._latest_canonical, - self._latest_canonical + '/', update_location + '/']) + self._latest_canonical + '/', update_location + '/'], loop=loop) elif not os.path.islink(self._user_location): - yield self._migrate(update_location) - update_location = (yield self.init_update()) + yield self._migrate(update_location, loop=loop) + update_location = (yield self.init_update(loop=loop)) self._update_location = update_location coroutine_return(self._update_location) @coroutine - def _migrate(self, update_location): + def _migrate(self, update_location, loop=None): """ When repo.user_location is a normal directory, migrate it to storage so that it can be replaced with a symlink. After migration, @@ -164,26 +164,26 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): os.stat(self._user_location)) # It's probably on a different device, so copy it. yield self._check_call(['rsync', '-a', - self._user_location + '/', update_location + '/']) + self._user_location + '/', update_location + '/'], loop=loop) # Remove the old copy so that symlink can be created. Run with # maximum privileges, since removal requires write access to # the parent directory. - yield self._check_call(['rm', '-rf', user_location], privileged=True) + yield self._check_call(['rm', '-rf', user_location], privileged=True, loop=loop) self._update_location = update_location # Make this copy the latest snapshot - yield self.commit_update() + yield self.commit_update(loop=loop) @property - def current_update(self): + def current_update(self, loop=None): if self._update_location is None: raise RepoStorageException('current update does not exist') return self._update_location @coroutine - def commit_update(self): + def commit_update(self, loop=None): update_location = self.current_update self._update_location = None try: @@ -235,14 +235,14 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): yield None @coroutine - def abort_update(self): + def abort_update(self, loop=None): if self._update_location is not None: update_location = self._update_location self._update_location = None - yield self._check_call(['rm', '-rf', update_location]) + yield self._check_call(['rm', '-rf', update_location], loop=loop) @coroutine - def garbage_collection(self): + def garbage_collection(self, loop=None): snap_ttl = datetime.timedelta(days=self._ttl_days) snapshots = sorted(int(name) for name in os.listdir(self._snapshots_dir)) # always preserve the latest snapshot @@ -259,4 +259,4 @@ class HardlinkRcuRepoStorage(RepoStorageInterface): snap_timestamp = datetime.datetime.utcfromtimestamp(st.st_mtime) if (datetime.datetime.utcnow() - snap_timestamp) < snap_ttl: continue - yield self._check_call(['rm', '-rf', snap_path]) + yield self._check_call(['rm', '-rf', snap_path], loop=loop) diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py index f1117ad03..3dbcbd7ad 100644 --- a/lib/portage/repository/storage/inplace.py +++ b/lib/portage/repository/storage/inplace.py @@ -19,31 +19,31 @@ class InplaceRepoStorage(RepoStorageInterface): self._update_location = None @coroutine - def init_update(self): + def init_update(self, loop=None): self._update_location = self._user_location coroutine_return(self._update_location) yield None @property - def current_update(self): + def current_update(self, loop=None): if self._update_location is None: raise RepoStorageException('current update does not exist') return self._update_location @coroutine - def commit_update(self): + def commit_update(self, loop=None): self.current_update self._update_location = None coroutine_return() yield None @coroutine - def abort_update(self): + def abort_update(self, loop=None): self._update_location = None coroutine_return() yield None @coroutine - def garbage_collection(self): + def garbage_collection(self, loop=None): coroutine_return() yield None diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py index ce8a2a170..4f5be6dbc 100644 --- a/lib/portage/repository/storage/interface.py +++ b/lib/portage/repository/storage/interface.py @@ -33,7 +33,7 @@ class RepoStorageInterface: raise NotImplementedError @coroutine - def init_update(self): + def init_update(self, loop=None): """ Create an update directory as a destination to sync updates to. The directory will be populated with files from the previous @@ -50,7 +50,7 @@ class RepoStorageInterface: raise NotImplementedError @property - def current_update(self): + def current_update(self, loop=None): """ Get the current update directory which would have been returned from the most recent call to the init_update method. This raises @@ -63,7 +63,7 @@ class RepoStorageInterface: raise NotImplementedError @coroutine - def commit_update(self): + def commit_update(self, loop=None): """ Commit the current update directory, so that is becomes the latest immutable snapshot. @@ -71,7 +71,7 @@ class RepoStorageInterface: raise NotImplementedError @coroutine - def abort_update(self): + def abort_update(self, loop=None): """ Delete the current update directory. If there was not an update in progress, or it has already been committed, then this has @@ -80,7 +80,7 @@ class RepoStorageInterface: raise NotImplementedError @coroutine - def garbage_collection(self): + def garbage_collection(self, loop=None): """ Remove expired snapshots. """ diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py index 5f18e5ba3..8e83b94fb 100644 --- a/lib/portage/sync/syncbase.py +++ b/lib/portage/sync/syncbase.py @@ -108,7 +108,7 @@ class SyncBase: """ if self._repo_storage is None: storage_cls = portage.load_mod(self._select_storage_module()) - self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs)) + self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs), loop=global_event_loop()) return self._repo_storage @property diff --git a/lib/portage/tests/dbapi/test_auxdb.py b/lib/portage/tests/dbapi/test_auxdb.py index 907c289fb..1029de70d 100644 --- a/lib/portage/tests/dbapi/test_auxdb.py +++ b/lib/portage/tests/dbapi/test_auxdb.py @@ -63,8 +63,9 @@ class AuxdbTestCase(TestCase): portdb = playground.trees[playground.eroot]["porttree"].dbapi def test_func(): - return asyncio._wrap_loop().run_until_complete(self._test_mod_async( - ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb)) + loop = asyncio._wrap_loop() + return loop.run_until_complete(self._test_mod_async( + ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=loop)) self.assertTrue(test_func()) @@ -91,10 +92,10 @@ class AuxdbTestCase(TestCase): self.assertEqual(auxdb[cpv]['RESTRICT'], 'test') @coroutine - def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb): + def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=None): for cpv, metadata in ebuilds.items(): - defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED']) + defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'], loop=loop) self.assertEqual(defined_phases, eclass_defined_phases) self.assertEqual(depend, eclass_depend) self.assertEqual(eapi, metadata['EAPI']) diff --git a/lib/portage/tests/emerge/test_simple.py b/lib/portage/tests/emerge/test_simple.py index 94b3076c1..c24f5c603 100644 --- a/lib/portage/tests/emerge/test_simple.py +++ b/lib/portage/tests/emerge/test_simple.py @@ -225,10 +225,10 @@ call_has_and_best_version() { loop = asyncio._wrap_loop() loop.run_until_complete(asyncio.ensure_future( - self._async_test_simple(loop, playground, metadata_xml_files), loop=loop)) + self._async_test_simple(playground, metadata_xml_files, loop=loop), loop=loop)) @coroutine - def _async_test_simple(self, loop, playground, metadata_xml_files): + def _async_test_simple(self, playground, metadata_xml_files, loop=None): debug = playground.debug settings = playground.settings @@ -540,7 +540,7 @@ move dev-util/git dev-vcs/git local_env = env proc = yield asyncio.create_subprocess_exec(*args, - env=local_env, stderr=None, stdout=stdout) + env=local_env, stderr=None, stdout=stdout, loop=loop) if debug: yield proc.wait() diff --git a/lib/portage/tests/process/test_AsyncFunction.py b/lib/portage/tests/process/test_AsyncFunction.py index 3b360e02f..b3f80b8ac 100644 --- a/lib/portage/tests/process/test_AsyncFunction.py +++ b/lib/portage/tests/process/test_AsyncFunction.py @@ -21,7 +21,7 @@ class AsyncFunctionTestCase(TestCase): return ''.join(sys.stdin) @coroutine - def _testAsyncFunctionStdin(self, loop): + def _testAsyncFunctionStdin(self, loop=None): test_string = '1\n2\n3\n' pr, pw = os.pipe() fd_pipes = {0:pr} @@ -36,7 +36,7 @@ class AsyncFunctionTestCase(TestCase): def testAsyncFunctionStdin(self): loop = asyncio._wrap_loop() - loop.run_until_complete(self._testAsyncFunctionStdin(loop)) + loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop)) def _test_getpid_fork(self): """ diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py index 2bd94cf39..acc3b8af9 100644 --- a/lib/portage/tests/process/test_PipeLogger.py +++ b/lib/portage/tests/process/test_PipeLogger.py @@ -37,7 +37,7 @@ class PipeLoggerTestCase(TestCase): # Before starting the reader, wait here for a moment, in order # to exercise PipeLogger's handling of EAGAIN during write. - yield asyncio.wait([writer], timeout=0.01) + yield asyncio.wait([writer], timeout=0.01, loop=loop) reader = _reader(pr, loop=loop) yield writer diff --git a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py index 8a8fb3d4f..cd547f008 100644 --- a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py +++ b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py @@ -38,7 +38,7 @@ class ChildWatcherTestCase(TestCase): future.set_result((pid, returncode, args)) @coroutine - def watch_pid(): + def watch_pid(loop=None): with asyncio.get_child_watcher() as watcher: pids = spawn([true_binary], returnpid=True) @@ -47,7 +47,7 @@ class ChildWatcherTestCase(TestCase): (yield future), (pids[0], os.EX_OK, args_tuple)) - loop.run_until_complete(watch_pid()) + loop.run_until_complete(watch_pid(loop=loop)) finally: asyncio.set_event_loop_policy(initial_policy) if loop not in (None, global_event_loop()): diff --git a/lib/portage/tests/util/futures/test_compat_coroutine.py b/lib/portage/tests/util/futures/test_compat_coroutine.py index 5a8230432..0fd459cbf 100644 --- a/lib/portage/tests/util/futures/test_compat_coroutine.py +++ b/lib/portage/tests/util/futures/test_compat_coroutine.py @@ -14,12 +14,13 @@ class CompatCoroutineTestCase(TestCase): def test_returning_coroutine(self): @coroutine - def returning_coroutine(): - yield asyncio.sleep(0) + def returning_coroutine(loop=None): + yield asyncio.sleep(0, loop=loop) coroutine_return('success') + loop = asyncio.get_event_loop() self.assertEqual('success', - asyncio.get_event_loop().run_until_complete(returning_coroutine())) + asyncio.get_event_loop().run_until_complete(returning_coroutine(loop=loop))) def test_raising_coroutine(self): @@ -27,12 +28,13 @@ class CompatCoroutineTestCase(TestCase): pass @coroutine - def raising_coroutine(): - yield asyncio.sleep(0) + def raising_coroutine(loop=None): + yield asyncio.sleep(0, loop=loop) raise TestException('exception') + loop = asyncio.get_event_loop() self.assertRaises(TestException, - asyncio.get_event_loop().run_until_complete, raising_coroutine()) + loop.run_until_complete, raising_coroutine(loop=loop)) def test_catching_coroutine(self): @@ -109,17 +111,18 @@ class CompatCoroutineTestCase(TestCase): yield future loop = asyncio.get_event_loop() - future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine()]))[0].pop() + future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine(loop=loop)], loop=loop))[0].pop() self.assertTrue(future.cancelled()) def test_yield_expression_result(self): @coroutine - def yield_expression_coroutine(): + def yield_expression_coroutine(loop=None): for i in range(3): - x = yield asyncio.sleep(0, result=i) + x = yield asyncio.sleep(0, result=i, loop=loop) self.assertEqual(x, i) - asyncio.get_event_loop().run_until_complete(yield_expression_coroutine()) + loop = asyncio.get_event_loop() + loop.run_until_complete(yield_expression_coroutine(loop=loop)) def test_method_coroutine(self): @@ -144,7 +147,7 @@ class CompatCoroutineTestCase(TestCase): return waiter @coroutine - def read(self): + def read(self, loop=None): while self._value is self._empty: yield self._wait() @@ -154,7 +157,7 @@ class CompatCoroutineTestCase(TestCase): coroutine_return(value) @coroutine - def write(self, value): + def write(self, value, loop=None): while self._value is not self._empty: yield self._wait() @@ -162,16 +165,16 @@ class CompatCoroutineTestCase(TestCase): self._notify() @coroutine - def writer_coroutine(cubby, values, sentinel): + def writer_coroutine(cubby, values, sentinel, loop=None): for value in values: - yield cubby.write(value) - yield cubby.write(sentinel) + yield cubby.write(value, loop=loop) + yield cubby.write(sentinel, loop=loop) @coroutine - def reader_coroutine(cubby, sentinel): + def reader_coroutine(cubby, sentinel, loop=None): results = [] while True: - result = yield cubby.read() + result = yield cubby.read(loop=loop) if result == sentinel: break results.append(result) @@ -180,9 +183,9 @@ class CompatCoroutineTestCase(TestCase): loop = asyncio.get_event_loop() cubby = Cubby(loop) values = list(range(3)) - writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) - reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop) - loop.run_until_complete(asyncio.wait([writer, reader])) + writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop) + reader = asyncio.ensure_future(reader_coroutine(cubby, None, loop=loop), loop=loop) + loop.run_until_complete(asyncio.wait([writer, reader], loop=loop)) self.assertEqual(reader.result(), values) @@ -191,7 +194,7 @@ class CompatCoroutineTestCase(TestCase): # blend with synchronous code. sync_cubby = _sync_methods(cubby, loop=loop) sync_reader = _sync_decorator(reader_coroutine, loop=loop) - writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop) + writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop) self.assertEqual(sync_reader(cubby, None), values) self.assertTrue(writer.done()) diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py index ca32651a7..44d522013 100644 --- a/lib/portage/tests/util/test_socks5.py +++ b/lib/portage/tests/util/test_socks5.py @@ -185,7 +185,7 @@ class Socks5ServerTestCase(TestCase): } proxy = socks5.get_socks5_proxy(settings) - loop.run_until_complete(socks5.proxy.ready()) + loop.run_until_complete(socks5.proxy.ready(loop=loop)) result = loop.run_until_complete(loop.run_in_executor(None, self._fetch_via_proxy, proxy, host, server.server_port, path)) diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py index f25f70d5b..5a9c076b6 100644 --- a/lib/portage/util/_async/BuildLogger.py +++ b/lib/portage/util/_async/BuildLogger.py @@ -78,11 +78,11 @@ class BuildLogger(AsynchronousTask): pipe_logger.start() self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger) - self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler) + self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger, loop=self.scheduler), loop=self.scheduler) self._main_task.add_done_callback(self._main_exit) @coroutine - def _main(self, filter_proc, pipe_logger): + def _main(self, filter_proc, pipe_logger, loop=None): try: if pipe_logger.poll() is None: yield pipe_logger.async_wait() diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index eb01a6232..3c9c6e22b 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -47,7 +47,7 @@ class ForkProcess(SpawnProcess): os.close(stdin_dup) self._proc_join_task = asyncio.ensure_future( - self._proc_join(self._proc)) + self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler) self._proc_join_task.add_done_callback( functools.partial(self._proc_join_done, self._proc)) @@ -68,7 +68,7 @@ class ForkProcess(SpawnProcess): super(ForkProcess, self)._async_waitpid() @coroutine - def _proc_join(self, proc): + def _proc_join(self, proc, loop=None): sentinel_reader = self.scheduler.create_future() self.scheduler.add_reader(proc.sentinel, lambda: sentinel_reader.done() or sentinel_reader.set_result(None)) @@ -93,7 +93,7 @@ class ForkProcess(SpawnProcess): proc.join(0) if proc.exitcode is not None: break - yield asyncio.sleep(self._proc_join_interval) + yield asyncio.sleep(self._proc_join_interval, loop=loop) def _proc_join_done(self, proc, future): future.cancelled() or future.result() diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index 2bbdd3ddb..e8203268c 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -53,7 +53,7 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) - self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler) + self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd, loop=self.scheduler), loop=self.scheduler) self._io_loop_task.add_done_callback(self._io_loop_done) self._registered = True @@ -63,7 +63,7 @@ class PipeLogger(AbstractPollTask): self.returncode = self._cancelled_returncode @coroutine - def _io_loop(self, input_file): + def _io_loop(self, input_file, loop=None): background = self.background stdout_fd = self.stdout_fd log_file = self._log_file diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py index 3ff250d1d..2865266eb 100644 --- a/lib/portage/util/_async/SchedulerInterface.py +++ b/lib/portage/util/_async/SchedulerInterface.py @@ -57,7 +57,7 @@ class SchedulerInterface(SlotObject): @coroutine def async_output(self, msg, log_file=None, background=None, - level=0, noiselevel=-1): + level=0, noiselevel=-1, loop=None): """ Output a msg to stdio (if not in background) and to a log file if provided. @@ -81,7 +81,7 @@ class SchedulerInterface(SlotObject): writemsg_level(msg, level=level, noiselevel=noiselevel) if log_file is not None: - yield _writer(log_file, _unicode_encode(msg)) + yield _writer(log_file, _unicode_encode(msg), loop=loop) def output(self, msg, log_path=None, background=None, level=0, noiselevel=-1): diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py index 6ff156c9d..275c9031a 100644 --- a/lib/portage/util/futures/_asyncio/process.py +++ b/lib/portage/util/futures/_asyncio/process.py @@ -39,7 +39,7 @@ class _Process: return self._proc.returncode @coroutine - def communicate(self, input=None): # pylint: disable=redefined-builtin + def communicate(self, input=None, loop=None): # pylint: disable=redefined-builtin """ Read data from stdout and stderr, until end-of-file is reached. Wait for process to terminate. @@ -49,13 +49,14 @@ class _Process: @return: tuple (stdout_data, stderr_data) @rtype: asyncio.Future (or compatible) """ + loop = asyncio._wrap_loop(loop or self._loop) futures = [] for input_file in (self._proc.stdout, self._proc.stderr): if input_file is None: - future = self._loop.create_future() + future = loop.create_future() future.set_result(None) else: - future = _reader(input_file, loop=self._loop) + future = _reader(input_file, loop=loop) futures.append(future) writer = None @@ -65,11 +66,11 @@ class _Process: stdin = self._proc.stdin stdin = os.fdopen(stdin, 'wb', 0) if isinstance(stdin, int) else stdin _set_nonblocking(stdin.fileno()) - writer = asyncio.ensure_future(_writer(stdin, input, loop=self._loop), loop=self._loop) + writer = asyncio.ensure_future(_writer(stdin, input, loop=loop), loop=loop) writer.add_done_callback(lambda writer: stdin.close()) try: - yield asyncio.wait(futures + [self.wait()], loop=self._loop) + yield asyncio.wait(futures + [self.wait(loop=loop)], loop=loop) finally: if writer is not None: if writer.done(): @@ -84,14 +85,15 @@ class _Process: coroutine_return(tuple(future.result() for future in futures)) - def wait(self): + def wait(self, loop=None): """ Wait for child process to terminate. Set and return returncode attribute. @return: returncode @rtype: asyncio.Future (or compatible) """ - waiter = self._loop.create_future() + loop = asyncio._wrap_loop(loop or self._loop) + waiter = loop.create_future() if self.returncode is None: self._waiters.append(waiter) waiter.add_done_callback(self._waiter_cancel) diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py index 02a0963a7..3da065789 100644 --- a/lib/portage/util/futures/_sync_decorator.py +++ b/lib/portage/util/futures/_sync_decorator.py @@ -15,9 +15,10 @@ def _sync_decorator(func, loop=None): function that returns a Future) with a wrapper that runs the function synchronously. """ - loop = asyncio._wrap_loop(loop) @functools.wraps(func) def wrapper(*args, **kwargs): + nonlocal loop + loop = kwargs['loop'] = asyncio._wrap_loop(kwargs.get('loop') or loop) return loop.run_until_complete(func(*args, **kwargs)) return wrapper diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py index 79bd0da68..9a0c5c1c8 100644 --- a/lib/portage/util/futures/compat_coroutine.py +++ b/lib/portage/util/futures/compat_coroutine.py @@ -67,7 +67,12 @@ def _generator_future(generator_func, *args, **kwargs): keyword argument named 'loop' is given, then it is used instead of the default event loop. """ - loop = asyncio._wrap_loop(kwargs.get('loop')) + loop = kwargs.get('loop') + if loop is None and portage._internal_caller: + # Require an explicit loop parameter, in order to support + # local event loops (bug 737698). + raise AssertionError("Missing required argument 'loop'") + loop = asyncio._wrap_loop(loop) result = loop.create_future() _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop) return result diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py index 65d2400e8..9f22c1dbe 100644 --- a/lib/portage/util/socks5.py +++ b/lib/portage/util/socks5.py @@ -76,7 +76,7 @@ class ProxyManager: @coroutine - def ready(self): + def ready(self, loop=None): """ Wait for the proxy socket to become ready. This method is a coroutine. """ @@ -98,7 +98,7 @@ class ProxyManager: except EnvironmentError as e: if e.errno != errno.ENOENT: raise - yield asyncio.sleep(0.2) + yield asyncio.sleep(0.2, loop=loop) else: break finally: diff --git a/repoman/lib/repoman/modules/scan/depend/profile.py b/repoman/lib/repoman/modules/scan/depend/profile.py index 1eb69422a..468bc55e2 100644 --- a/repoman/lib/repoman/modules/scan/depend/profile.py +++ b/repoman/lib/repoman/modules/scan/depend/profile.py @@ -146,7 +146,7 @@ class ProfileDependsChecks(ScanBase): % (ebuild.relative_path, mytype, ", ".join(sorted(atoms)))) @coroutine - def _task(self, task): + def _task(self, task, loop=None): yield task.future coroutine_return((task, task.future.result())) @@ -222,7 +222,7 @@ class ProfileDependsChecks(ScanBase): yield (task, target()) else: task.future = asyncio.ensure_future(loop.run_in_executor(executor, target), loop=loop) - yield self._task(task) + yield self._task(task, loop=loop) def _task_subprocess(self, task, pkg, dep_settings):
