Pretty sure it will as I am calling very similar code..let me try that out
and report back
Thanks!!

On Mon, 26 Aug 2024, 08:44 Jaehyeon Kim, <[email protected]> wrote:

> Something like this work for you? I just played with a 3rd party package
> so you need to install it (pip install yahoo-finance-async)
>
> import asyncio
>
> import apache_beam as beam
> from yahoo_finance_async import OHLC
>
>
> class AsyncProcess(beam.DoFn):
>     async def fetch_data(self, element: str):
>         result = await OHLC.fetch(symbol=element)
>         return [d["open"] for d in result["candles"]]
>
>     def process(self, element: str):
>         return asyncio.run(self.fetch_data(element))
>
>
> def run():
>     with beam.Pipeline() as p:
>         (p | beam.Create(["AAPL"]) | beam.ParDo(AsyncProcess()) | beam.Map
> (print))
>
>
> if __name__ == "__main__":
>     run()
>
> On Mon, 26 Aug 2024 at 16:48, Sofia’s World <[email protected]> wrote:
>
>> Hello
>>  thanks all
>>  have tried this simple test case - before checking URL above - but did
>> not work out
>>
>> async def process_data(element):
>>     # Asynchronous processing logic
>>     await asyncio.sleep(5)  # Simulate an asynchronous operation
>>     return element
>>
>>
>> class AsyncProcess(beam.DoFn):
>>
>>     def __init__(self):
>>         self.loop = asyncio.new_event_loop()
>>     def process(self, element):
>>         async def process_element_async():
>>             return await process_data(element)
>>
>>         return self.loop.run_until_complete(process_element_async())
>>
>>
>> when i put this in a pipeline  (debugSink is simply  beam.Map(
>> logging.info)
>>
>> with TestPipeline(options=PipelineOptions()) as p:
>>     input = (p | 'Start' >> beam.Create(['AAPL'])
>>              | 'Run Loader' >> beam.ParDo(AsyncProcess())
>>              | self.debugSink
>>              )
>>
>> i end up getting this.. which somehow i was expecting
>>
>> self = <socket.socket fd=2136, family=AddressFamily.AF_INET,
>> type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 51510),
>> raddr=('127.0.0.1', 51511)>
>>
>>     def __getstate__(self):
>> >       raise TypeError(f"cannot pickle {self.__class__.__name__!r}
>> object")
>> E       TypeError: cannot pickle 'socket' object
>>
>>
>> The usecase i have is that i have to call an API which under the hood
>> uses async api
>>
>> data = await fetcher.fetch_data(params, credentials)
>>
>>
>>
>> so on my side, not sure which options i have  (bear in mind i am also an
>> asyncio  noob)
>>
>> Thanks and regards
>> Marco
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Aug 25, 2024 at 11:24 PM Jaehyeon Kim <[email protected]> wrote:
>>
>>> I guess there are multiple options.
>>>
>>> The easiest one would be converting the async method to a sync method or
>>> creating a wrapper method for doing so.
>>>
>>> Also, the following stackoverflow post introduces another two options
>>> you may find useful.
>>>
>>>    - asynchronous API calls in apache beam -
>>>    
>>> https://stackoverflow.com/questions/72842846/asynchronous-api-calls-in-apache-beam
>>>
>>>
>>>
>>>
>>> On Mon, 26 Aug 2024 at 01:40, Sofia’s World <[email protected]> wrote:
>>>
>>>> Not sure...I think I'll do a sample  and post it on the list ..
>>>> Thanks
>>>>  Marco
>>>>
>>>> On Sun, 25 Aug 2024, 14:46 XQ Hu via user, <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Wait.html
>>>>>
>>>>> Is this something you are looking for?
>>>>>
>>>>> On Sun, Aug 25, 2024 at 6:21 AM Sofia’s World <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> HI all
>>>>>>   i want to write a pipeline where, as part of one of the steps, i
>>>>>> will need to use
>>>>>> an await  call  ,such as this one
>>>>>>
>>>>>> await fetcher.fetch_data(params, self.credentials)
>>>>>>
>>>>>> is Beam equipped for that?
>>>>>>
>>>>>> Does anyone have a sample to pass me for reference?
>>>>>>
>>>>>> Kind regards
>>>>>>  Marco
>>>>>>
>>>>>>
>>>>>>

Reply via email to