idella4     15/06/09 14:08:48

  Added:                util.py
  Log:
  ebuild written by me, new runtime dep to autobahn-0.10.4
  
  (Portage version: 2.2.20/cvs/Linux x86_64, signed Manifest commit with key 
0xB8072B0D)

Revision  Changes    Path
1.1                  dev-python/txaio/files/util.py

file : 
http://sources.gentoo.org/viewvc.cgi/gentoo-x86/dev-python/txaio/files/util.py?rev=1.1&view=markup
plain: 
http://sources.gentoo.org/viewvc.cgi/gentoo-x86/dev-python/txaio/files/util.py?rev=1.1&content-type=text/plain

Index: util.py
===================================================================
try:
    import asyncio
    from asyncio.test_utils import run_once as _run_once

    def run_once():
        return _run_once(asyncio.get_event_loop())

except ImportError as e:
    try:
        import trollius as asyncio
    except ImportError:
        asyncio = None

    def run_once():
        '''
        copied from asyncio.testutils because trollius has no
        "testutils"
        '''
        # in Twisted, this method is a no-op
        if asyncio is None:
            return

        # just like modern asyncio.testutils.run_once does it...
        loop = asyncio.get_event_loop()
        loop.stop()
        loop.run_forever()
        asyncio.gather(*asyncio.Task.all_tasks())


try:
    # XXX fixme hack better way to detect twisted
    # (has to work on py3 where asyncio exists always, though)
    import twisted  # noqa

    def await(_):
        return

except ImportError:
    def await(future):
        asyncio.get_event_loop().run_until_complete(future)




Reply via email to