Alexander Mohr added the comment: Perhaps I'm doing something really stupid, but I was able to reproduce the two issues I'm having with the following sample script. If you leave the monkey patch disabled, you get the InvalidStateError, if you enable it, you get the ServerDisconnect errors that I'm currently seeing which I work-around with retries. Ideas?
import asyncio import aiohttp import multiprocessing import aiohttp.server import logging import traceback # Monkey patching import asyncio.selector_events # http://bugs.python.org/issue25593 if False: orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb def _sock_connect_cb(self, fut, sock, address): if fut.done(): return return orig_sock_connect_cb(self, fut, sock, address) asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb class HttpRequestHandler(aiohttp.server.ServerHttpProtocol): @asyncio.coroutine def handle_request(self, message, payload): response = aiohttp.Response(self.writer, 200, http_version=message.version) response.add_header('Content-Type', 'text/html') response.add_header('Content-Length', '18') response.send_headers() yield from asyncio.sleep(0.5) response.write(b'<h1>It Works!</h1>') yield from response.write_eof() def process_worker(q): loop = asyncio.get_event_loop() #loop.set_debug(True) connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True) session = aiohttp.ClientSession(connector=connector) async_queue = asyncio.Queue(100) @asyncio.coroutine def async_worker(session, async_queue): while True: try: print("blocking on asyncio queue get") url = yield from async_queue.get() print("unblocking on asyncio queue get") print("get aqueue size:", async_queue.qsize()) response = yield from session.request('GET', url) try: data = yield from response.read() print(data) finally: yield from response.wait_for_close() except: traceback.print_exc() def producer(q): print("blocking on multiprocessing queue get") obj2 = q.get() print("unblocking on multiprocessing queue get") print("get qempty:", q.empty()) return obj2 def worker_done(f): try: f.result() print("worker exited") except: traceback.print_exc() workers = [] for i in range(100): t = asyncio.ensure_future(async_worker(session, async_queue)) t.add_done_callback(worker_done) workers.append(t) @asyncio.coroutine def doit(): print("start producer") obj = yield from loop.run_in_executor(None, producer, q) print("finish producer") print("blocking on asyncio queue put") yield from async_queue.put(obj) print("unblocking on asyncio queue put") print("put aqueue size:", async_queue.qsize()) while True: loop.run_until_complete(doit()) def server(): loop = asyncio.get_event_loop() #loop.set_debug(True) f = loop.create_server(lambda: HttpRequestHandler(debug=True, keep_alive=75), '0.0.0.0', '8080') srv = loop.run_until_complete(f) loop.run_forever() if __name__ == '__main__': q = multiprocessing.Queue(100) log_proc = multiprocessing.log_to_stderr() log_proc.setLevel(logging.DEBUG) p = multiprocessing.Process(target=process_worker, args=(q,)) p.start() p2 = multiprocessing.Process(target=server) p2.start() while True: print("blocking on multiprocessing queue put") q.put("http://0.0.0.0:8080") print("unblocking on multiprocessing queue put") print("put qempty:", q.empty()) ---------- _______________________________________ Python tracker <rep...@bugs.python.org> <http://bugs.python.org/issue25593> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com