Hey Dian, You are right. What this approach does is to allow one to "queue n messages", call some async function on each of them and return results. Which means it will indeed block flink job after it gathers n messages. It is a down-side indeed. Some optimisations with parallelism, watermarking etc can be implemented, but it is going to be an inherently blocking operation at some point.
On Fri, 6 May 2022 at 08:29, Dian Fu <dian0511...@gmail.com> wrote: > Hi Dhavan, > > Thanks a lot for the sharing. This is very interesting. Just want to add > that this is somewhat different from the asyncio operator supported in > Flink, e.g. you are waiting the results of one element before processing > the next element and so it's actually synchronous from this point of view. > > Regards, > Dian > > On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> > wrote: > >> I have found the following way to make use of aiohttp via asyncio. It >> works alright, and flatmap can be converted to process function to add >> timers. >> >> ds = env.from_collection( >> collection=["a", "b"], >> type_info=Types.STRING()) >> >> class MyFlatMapFunction(FlatMapFunction): >> queue = [] >> >> async def process_queue(self, value): >> tasks = [] >> while len(self.queue) > 0: >> tasks.append(self.make_http_call(self.queue.pop())) >> return await asyncio.gather(*tasks) >> >> async def make_http_call(self, value): >> async with aiohttp.ClientSession() as session: >> url = 'http://localhost/' >> async with session.get(url) as response: >> return await response.json() >> >> def flat_map(self, value): >> # print("Channel ID: {}".format(value)) >> self.queue.append(value) >> if len(self.queue) == 1: >> results = asyncio.run(self.process_queue(value)) >> for result in results: >> yield result >> >> >> ds = ds.flat_map(MyFlatMapFunction()) >> >> >> ds.print() >> env.execute() >> >> On Thu, 5 May 2022 at 08:26, Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Dhavan, >>> >>> Asyncio operator is still not supported in PyFlink. >>> >>> Regards, >>> Dian >>> >>> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya < >>> dhavan.vai...@kofluence.com> wrote: >>> >>>> Hey Francis! >>>> >>>> Thanks for the insights! I am thinking of using Java / Scala for this >>>> scenario given your input. Introducing a new language to the team, however, >>>> is going to be a big ask :-D >>>> >>>> Another option that you mentioned is pushing enrichment data instead of >>>> pulling. That would be excellent, I will try to model the pipes and see if >>>> that works. >>>> >>>> Thanks again! >>>> >>>> On Tue, 3 May 2022 at 05:53, Francis Conroy < >>>> francis.con...@switchdin.com> wrote: >>>> >>>>> Hi Dhavan, >>>>> >>>>> We have looked at using pyflink for data stream enrichment and found >>>>> the performance lacking compared to the java counterpart. One option for >>>>> you might be to use statefun for the enrichment stages. We've also changed >>>>> our model for enrichment, we're pushing the enrichment data into the >>>>> pipeline instead of pulling it, but this won't work in a lot of >>>>> situations. >>>>> >>>>> Hope that gives you some ideas. >>>>> >>>>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya < >>>>> dhavan.vai...@kofluence.com> wrote: >>>>> >>>>>> Hello! >>>>>> >>>>>> I want to make HTTP(S) calls to enrich data streams. The HTTP >>>>>> services are running on our VPC, so the delay is limited, but sometimes >>>>>> these services end up calling third party APIs, and latencies become >>>>>> high. >>>>>> >>>>>> From documentation ( >>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/) >>>>>> it seems PyFlink does not support "asyncio operator" like Java does ( >>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/). >>>>>> Am I missing something? How should this be approached? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> -- >>>>>> Dhavan >>>>>> >>>>> >>>>> This email and any attachments are proprietary and confidential and >>>>> are intended solely for the use of the individual to whom it is addressed. >>>>> Any views or opinions expressed are solely those of the author and do not >>>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have >>>>> received this email in error, please let us know immediately by reply >>>>> email >>>>> and delete it from your system. You may not use, disseminate, distribute >>>>> or >>>>> copy this message nor disclose its contents to anyone. >>>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 >>>>> Australia >>>>> >>>>