[ https://issues.apache.org/jira/browse/FLINK-19515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Frans King updated FLINK-19515: ------------------------------- Description: Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem. Lines 151 to 152 of [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py] The coro is awaiting and may yield. Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None In short: {code:java} ic.setup(request_bytes) await self.handle_invocation(ic) return ic.complete() {code} Needs to happen atomically. I worked around this by creating an AsyncRequestReplyHandler for each request. It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop (-w 1). {code:java} response_data = await handler(request_data) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__ return ic.complete() File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete self.add_mutations(context, invocation_result) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations for name, handle in context.states.items(): AttributeError: 'NoneType' object has no attribute 'states' {code} was: Async RequestReply handler implemented in https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem. Lines 151 to 152 of [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py] The coro is awaiting and may yield. Another coro may continue that was yielding and call ic.complete() which sets the ic.context to None In short: {code:java} ic.setup(request_bytes) await self.handle_invocation(ic) return ic.complete() {code} Needs to happen atomically. I worked around this by creating an AsyncRequestReplyHandler for each request. It should be possible to re-produce this by putting an await asyncio.sleep(5) in the greeter example and then run in gunicorn with a single asyncio thread/event loop. {code:java} response_data = await handler(request_data) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 152, in __call__ return ic.complete() File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 57, in complete self.add_mutations(context, invocation_result) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations for name, handle in context.states.items(): AttributeError: 'NoneType' object has no attribute 'states' {code} > Async RequestReply handler concurrency bug > ------------------------------------------ > > Key: FLINK-19515 > URL: https://issues.apache.org/jira/browse/FLINK-19515 > Project: Flink > Issue Type: Bug > Affects Versions: statefun-2.2.0 > Reporter: Frans King > Priority: Minor > > Async RequestReply handler implemented in > https://issues.apache.org/jira/browse/FLINK-18518 has a concurrency problem. > > Lines 151 to 152 of > [https://github.com/apache/flink-statefun/blob/master/statefun-python-sdk/statefun/request_reply.py] > The coro is awaiting and may yield. Another coro may continue that was > yielding and call ic.complete() which sets the ic.context to None > > In short: > > {code:java} > ic.setup(request_bytes) > await self.handle_invocation(ic) > return ic.complete() > > {code} > Needs to happen atomically. > > I worked around this by creating an AsyncRequestReplyHandler for each request. > > It should be possible to re-produce this by putting an await asyncio.sleep(5) > in the greeter example and then run in gunicorn with a single asyncio > thread/event loop (-w 1). > > > > {code:java} > response_data = await handler(request_data) > File > "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line > 152, in __call__ > return ic.complete() > File > "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line > 57, in complete > self.add_mutations(context, invocation_result) > File > "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line > 82, in add_mutations > for name, handle in context.states.items(): > AttributeError: 'NoneType' object has no attribute 'states' > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)