Something like this work for you? I just played with a 3rd party package so
you need to install it (pip install yahoo-finance-async)

import asyncio

import apache_beam as beam
from yahoo_finance_async import OHLC


class AsyncProcess(beam.DoFn):
    async def fetch_data(self, element: str):
        result = await OHLC.fetch(symbol=element)
        return [d["open"] for d in result["candles"]]

    def process(self, element: str):
        return asyncio.run(self.fetch_data(element))


def run():
    with beam.Pipeline() as p:
        (p | beam.Create(["AAPL"]) | beam.ParDo(AsyncProcess()) | beam.Map(
print))


if __name__ == "__main__":
    run()

On Mon, 26 Aug 2024 at 16:48, Sofia’s World <[email protected]> wrote:

> Hello
>  thanks all
>  have tried this simple test case - before checking URL above - but did
> not work out
>
> async def process_data(element):
>     # Asynchronous processing logic
>     await asyncio.sleep(5)  # Simulate an asynchronous operation
>     return element
>
>
> class AsyncProcess(beam.DoFn):
>
>     def __init__(self):
>         self.loop = asyncio.new_event_loop()
>     def process(self, element):
>         async def process_element_async():
>             return await process_data(element)
>
>         return self.loop.run_until_complete(process_element_async())
>
>
> when i put this in a pipeline  (debugSink is simply  beam.Map(logging.info
> )
>
> with TestPipeline(options=PipelineOptions()) as p:
>     input = (p | 'Start' >> beam.Create(['AAPL'])
>              | 'Run Loader' >> beam.ParDo(AsyncProcess())
>              | self.debugSink
>              )
>
> i end up getting this.. which somehow i was expecting
>
> self = <socket.socket fd=2136, family=AddressFamily.AF_INET,
> type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 51510),
> raddr=('127.0.0.1', 51511)>
>
>     def __getstate__(self):
> >       raise TypeError(f"cannot pickle {self.__class__.__name__!r}
> object")
> E       TypeError: cannot pickle 'socket' object
>
>
> The usecase i have is that i have to call an API which under the hood uses
> async api
>
> data = await fetcher.fetch_data(params, credentials)
>
>
>
> so on my side, not sure which options i have  (bear in mind i am also an
> asyncio  noob)
>
> Thanks and regards
> Marco
>
>
>
>
>
>
>
>
> On Sun, Aug 25, 2024 at 11:24 PM Jaehyeon Kim <[email protected]> wrote:
>
>> I guess there are multiple options.
>>
>> The easiest one would be converting the async method to a sync method or
>> creating a wrapper method for doing so.
>>
>> Also, the following stackoverflow post introduces another two options you
>> may find useful.
>>
>>    - asynchronous API calls in apache beam -
>>    
>> https://stackoverflow.com/questions/72842846/asynchronous-api-calls-in-apache-beam
>>
>>
>>
>>
>> On Mon, 26 Aug 2024 at 01:40, Sofia’s World <[email protected]> wrote:
>>
>>> Not sure...I think I'll do a sample  and post it on the list ..
>>> Thanks
>>>  Marco
>>>
>>> On Sun, 25 Aug 2024, 14:46 XQ Hu via user, <[email protected]> wrote:
>>>
>>>>
>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Wait.html
>>>>
>>>> Is this something you are looking for?
>>>>
>>>> On Sun, Aug 25, 2024 at 6:21 AM Sofia’s World <[email protected]>
>>>> wrote:
>>>>
>>>>> HI all
>>>>>   i want to write a pipeline where, as part of one of the steps, i
>>>>> will need to use
>>>>> an await  call  ,such as this one
>>>>>
>>>>> await fetcher.fetch_data(params, self.credentials)
>>>>>
>>>>> is Beam equipped for that?
>>>>>
>>>>> Does anyone have a sample to pass me for reference?
>>>>>
>>>>> Kind regards
>>>>>  Marco
>>>>>
>>>>>
>>>>>

Reply via email to