[ https://issues.apache.org/jira/browse/FLINK-18518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208836#comment-17208836 ]
Frans King commented on FLINK-18518: ------------------------------------ I think there might be a concurrency bug with the implementation of AsyncRequestReplyHandler - https://issues.apache.org/jira/browse/FLINK-19515 ic.setup(). <- sets ic.context = some value await .... ic.complete() <- sets ic.context = None As a result ic.context can be None depending on how the coros yield/awaken resulting in line 57, in complete self.add_mutations(context, invocation_result) File "/home/pi/.local/lib/python3.7/site-packages/statefun/request_reply.py", line 82, in add_mutations for name, handle in context.states.items(): AttributeError: 'NoneType' object has no attribute 'states' > Add Async RequestReply handler for the Python SDK > ------------------------------------------------- > > Key: FLINK-18518 > URL: https://issues.apache.org/jira/browse/FLINK-18518 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions > Affects Versions: statefun-2.1.0 > Reporter: Igal Shilman > Assignee: Igal Shilman > Priority: Major > Labels: beginner-friendly, pull-request-available > Fix For: statefun-2.2.0 > > > I/O bound stateful functions can benefit from the built-in async/io support > in Python, but the > RequestReply handler is not an async-io compatible. See > [this|https://stackoverflow.com/questions/62640283/flink-stateful-functions-async-calls-with-the-python-sdk] > question on stackoverflow. > > Having an asyncio compatible handler will open the door to the usage of > aiohttp for example: > > {code:java} > import aiohttp > import asyncio > ... > async def fetch(session, url): > async with session.get(url) as response: > return await response.text() > @function.bind("example/hello") > async def hello(context, message): > async with aiohttp.ClientSession() as session: > html = await fetch(session, 'http://python.org') > context.pack_and_reply(SomeProtobufMessage(html)) > from aiohttp import webhandler > handler = AsyncRequestReplyHandler(functions) > async def handle(request): > req = await request.read() > res = await handler(req) > return web.Response(body=res, content_type="application/octet-stream'") > app = web.Application() > app.add_routes([web.post('/statefun', handle)]) > if __name__ == '__main__': > web.run_app(app, port=5000) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)