I have found the following way to make use of aiohttp via asyncio. It works
alright, and flatmap can be converted to process function to add timers.

ds = env.from_collection(
    collection=["a", "b"],
    type_info=Types.STRING())

class MyFlatMapFunction(FlatMapFunction):
    queue = []

    async def process_queue(self, value):
        tasks = []
        while len(self.queue) > 0:
            tasks.append(self.make_http_call(self.queue.pop()))
        return await asyncio.gather(*tasks)

    async def make_http_call(self, value):
        async with aiohttp.ClientSession() as session:
            url = 'http://localhost/'
            async with session.get(url) as response:
                return await response.json()

    def flat_map(self, value):
        # print("Channel ID: {}".format(value))
        self.queue.append(value)
        if len(self.queue) == 1:
            results = asyncio.run(self.process_queue(value))
            for result in results:
                yield result


ds = ds.flat_map(MyFlatMapFunction())


ds.print()
env.execute()

On Thu, 5 May 2022 at 08:26, Dian Fu <dian0511...@gmail.com> wrote:

> Hi Dhavan,
>
> Asyncio operator is still not supported in PyFlink.
>
> Regards,
> Dian
>
> On Tue, May 3, 2022 at 3:48 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
> wrote:
>
>> Hey Francis!
>>
>> Thanks for the insights! I am thinking of using Java / Scala for this
>> scenario given your input. Introducing a new language to the team, however,
>> is going to be a big ask :-D
>>
>> Another option that you mentioned is pushing enrichment data instead of
>> pulling. That would be excellent, I will try to model the pipes and see if
>> that works.
>>
>> Thanks again!
>>
>> On Tue, 3 May 2022 at 05:53, Francis Conroy <francis.con...@switchdin.com>
>> wrote:
>>
>>> Hi Dhavan,
>>>
>>> We have looked at using pyflink for data stream enrichment and found the
>>> performance lacking compared to the java counterpart. One option for you
>>> might be to use statefun for the enrichment stages. We've also changed our
>>> model for enrichment, we're pushing the enrichment data into the pipeline
>>> instead of pulling it, but this won't work in a lot of situations.
>>>
>>> Hope that gives you some ideas.
>>>
>>> On Mon, 2 May 2022 at 22:54, Dhavan Vaidya <dhavan.vai...@kofluence.com>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I want to make HTTP(S) calls to enrich data streams. The HTTP services
>>>> are running on our VPC, so the delay is limited, but sometimes these
>>>> services end up calling third party APIs, and latencies become high.
>>>>
>>>> From documentation (
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/overview/)
>>>> it seems PyFlink does not support "asyncio operator" like Java does (
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/).
>>>> Am I missing something? How should this be approached?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Dhavan
>>>>
>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>

Reply via email to