I have found the following way to make use of aiohttp via asyncio. It works alright, and flatmap can be converted to process function to add timers.
ds = env.from_collection( collection=["a", "b"], type_info=Types.STRING()) class MyFlatMapFunction(FlatMapFunction): queue = [] async def process_queue(self, value): tasks = [] while len(self.queue) > 0: tasks.append(self.make_http_call(self.queue.pop())) return await asyncio.gather(*tasks) async def make_http_call(self, value): async with aiohttp.ClientSession() as session: url = 'http://localhost/' async with session.get(url) as response: return await response.json() def flat_map(self, value): # print("Channel ID: {}".format(value)) self.queue.append(value) if len(self.queue) == 1: results = asyncio.run(self.process_queue(value)) for result in results: yield result ds = ds.flat_map(MyFlatMapFunction()) ds.print() env.execute() On Thu, 5 May 2022 at 08:26, Dian Fu <dian0511...@gmail.com> wrote: > Hi Dhavan, > > Asyncio operator is still not supported in PyFlink. > > Regards, > Dian > > On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> > wrote: > >> Hey Francis! >> >> Thanks for the insights! I am thinking of using Java / Scala for this >> scenario given your input. Introducing a new language to the team, however, >> is going to be a big ask :-D >> >> Another option that you mentioned is pushing enrichment data instead of >> pulling. That would be excellent, I will try to model the pipes and see if >> that works. >> >> Thanks again! >> >> On Tue, 3 May 2022 at 05:53, Francis Conroy <francis.con...@switchdin.com> >> wrote: >> >>> Hi Dhavan, >>> >>> We have looked at using pyflink for data stream enrichment and found the >>> performance lacking compared to the java counterpart. One option for you >>> might be to use statefun for the enrichment stages. We've also changed our >>> model for enrichment, we're pushing the enrichment data into the pipeline >>> instead of pulling it, but this won't work in a lot of situations. >>> >>> Hope that gives you some ideas. >>> >>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya <dhavan.vai...@kofluence.com> >>> wrote: >>> >>>> Hello! >>>> >>>> I want to make HTTP(S) calls to enrich data streams. The HTTP services >>>> are running on our VPC, so the delay is limited, but sometimes these >>>> services end up calling third party APIs, and latencies become high. >>>> >>>> From documentation ( >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) >>>> it seems PyFlink does not support "asyncio operator" like Java does ( >>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). >>>> Am I missing something? How should this be approached? >>>> >>>> Thanks! >>>> >>>> -- >>>> Dhavan >>>> >>> >>> This email and any attachments are proprietary and confidential and are >>> intended solely for the use of the individual to whom it is addressed. Any >>> views or opinions expressed are solely those of the author and do not >>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have >>> received this email in error, please let us know immediately by reply email >>> and delete it from your system. You may not use, disseminate, distribute or >>> copy this message nor disclose its contents to anyone. >>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 >>> Australia >>> >>