Hey Dian,

You are right. What this approach does is to allow one to "queue n
messages", call some async function on each of them and return results.
Which means it will indeed block flink job after it gathers n messages. It
is a down-side indeed. Some optimisations with parallelism, watermarking
etc can be implemented, but it is going to be an inherently blocking
operation at some point.

On Fri, 6 May 2022 at 08:29, Dian Fu <dian0511...@gmail.com> wrote:

> Hi Dhavan,
>
> Thanks a lot for the sharing. This is very interesting. Just want to add
> that this is somewhat different from the asyncio operator supported in
> Flink, e.g. you are waiting the results of one element before processing
> the next element and so it's actually synchronous from this point of view.
>
> Regards,
> Dian
>
> On Thu, May 5, 2022 at 9:52 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
> wrote:
>
>> I have found the following way to make use of aiohttp via asyncio. It
>> works alright, and flatmap can be converted to process function to add
>> timers.
>>
>> ds = env.from_collection(
>>     collection=["a", "b"],
>>     type_info=Types.STRING())
>>
>> class MyFlatMapFunction(FlatMapFunction):
>>     queue = []
>>
>>     async def process_queue(self, value):
>>         tasks = []
>>         while len(self.queue) > 0:
>>             tasks.append(self.make_http_call(self.queue.pop()))
>>         return await asyncio.gather(*tasks)
>>
>>     async def make_http_call(self, value):
>>         async with aiohttp.ClientSession() as session:
>>             url = 'http://localhost/'
>>             async with session.get(url) as response:
>>                 return await response.json()
>>
>>     def flat_map(self, value):
>>         # print("Channel ID: {}".format(value))
>>         self.queue.append(value)
>>         if len(self.queue) == 1:
>>             results = asyncio.run(self.process_queue(value))
>>             for result in results:
>>                 yield result
>>
>>
>> ds = ds.flat_map(MyFlatMapFunction())
>>
>>
>> ds.print()
>> env.execute()
>>
>> On Thu, 5 May 2022 at 08:26, Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Dhavan,
>>>
>>> Asyncio operator is still not supported in PyFlink.
>>>
>>> Regards,
>>> Dian
>>>
>>> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya <
>>> dhavan.vai...@kofluence.com> wrote:
>>>
>>>> Hey Francis!
>>>>
>>>> Thanks for the insights! I am thinking of using Java / Scala for this
>>>> scenario given your input. Introducing a new language to the team, however,
>>>> is going to be a big ask :-D
>>>>
>>>> Another option that you mentioned is pushing enrichment data instead of
>>>> pulling. That would be excellent, I will try to model the pipes and see if
>>>> that works.
>>>>
>>>> Thanks again!
>>>>
>>>> On Tue, 3 May 2022 at 05:53, Francis Conroy <
>>>> francis.con...@switchdin.com> wrote:
>>>>
>>>>> Hi Dhavan,
>>>>>
>>>>> We have looked at using pyflink for data stream enrichment and found
>>>>> the performance lacking compared to the java counterpart. One option for
>>>>> you might be to use statefun for the enrichment stages. We've also changed
>>>>> our model for enrichment, we're pushing the enrichment data into the
>>>>> pipeline instead of pulling it, but this won't work in a lot of 
>>>>> situations.
>>>>>
>>>>> Hope that gives you some ideas.
>>>>>
>>>>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya <
>>>>> dhavan.vai...@kofluence.com> wrote:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>> I want to make HTTP(S) calls to enrich data streams. The HTTP
>>>>>> services are running on our VPC, so the delay is limited, but sometimes
>>>>>> these services end up calling third party APIs, and latencies become 
>>>>>> high.
>>>>>>
>>>>>> From documentation (
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>>>>>> it seems PyFlink does not support "asyncio operator" like Java does (
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>>>>>> Am I missing something? How should this be approached?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> --
>>>>>> Dhavan
>>>>>>
>>>>>
>>>>> This email and any attachments are proprietary and confidential and
>>>>> are intended solely for the use of the individual to whom it is addressed.
>>>>> Any views or opinions expressed are solely those of the author and do not
>>>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>>>> received this email in error, please let us know immediately by reply 
>>>>> email
>>>>> and delete it from your system. You may not use, disseminate, distribute 
>>>>> or
>>>>> copy this message nor disclose its contents to anyone.
>>>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>>>> Australia
>>>>>
>>>>

Reply via email to