Pretty sure it will as I am calling very similar code..let me try that out and report back Thanks!!
On Mon, 26 Aug 2024, 08:44 Jaehyeon Kim, <[email protected]> wrote: > Something like this work for you? I just played with a 3rd party package > so you need to install it (pip install yahoo-finance-async) > > import asyncio > > import apache_beam as beam > from yahoo_finance_async import OHLC > > > class AsyncProcess(beam.DoFn): > async def fetch_data(self, element: str): > result = await OHLC.fetch(symbol=element) > return [d["open"] for d in result["candles"]] > > def process(self, element: str): > return asyncio.run(self.fetch_data(element)) > > > def run(): > with beam.Pipeline() as p: > (p | beam.Create(["AAPL"]) | beam.ParDo(AsyncProcess()) | beam.Map > (print)) > > > if __name__ == "__main__": > run() > > On Mon, 26 Aug 2024 at 16:48, Sofia’s World <[email protected]> wrote: > >> Hello >> thanks all >> have tried this simple test case - before checking URL above - but did >> not work out >> >> async def process_data(element): >> # Asynchronous processing logic >> await asyncio.sleep(5) # Simulate an asynchronous operation >> return element >> >> >> class AsyncProcess(beam.DoFn): >> >> def __init__(self): >> self.loop = asyncio.new_event_loop() >> def process(self, element): >> async def process_element_async(): >> return await process_data(element) >> >> return self.loop.run_until_complete(process_element_async()) >> >> >> when i put this in a pipeline (debugSink is simply beam.Map( >> logging.info) >> >> with TestPipeline(options=PipelineOptions()) as p: >> input = (p | 'Start' >> beam.Create(['AAPL']) >> | 'Run Loader' >> beam.ParDo(AsyncProcess()) >> | self.debugSink >> ) >> >> i end up getting this.. which somehow i was expecting >> >> self = <socket.socket fd=2136, family=AddressFamily.AF_INET, >> type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 51510), >> raddr=('127.0.0.1', 51511)> >> >> def __getstate__(self): >> > raise TypeError(f"cannot pickle {self.__class__.__name__!r} >> object") >> E TypeError: cannot pickle 'socket' object >> >> >> The usecase i have is that i have to call an API which under the hood >> uses async api >> >> data = await fetcher.fetch_data(params, credentials) >> >> >> >> so on my side, not sure which options i have (bear in mind i am also an >> asyncio noob) >> >> Thanks and regards >> Marco >> >> >> >> >> >> >> >> >> On Sun, Aug 25, 2024 at 11:24 PM Jaehyeon Kim <[email protected]> wrote: >> >>> I guess there are multiple options. >>> >>> The easiest one would be converting the async method to a sync method or >>> creating a wrapper method for doing so. >>> >>> Also, the following stackoverflow post introduces another two options >>> you may find useful. >>> >>> - asynchronous API calls in apache beam - >>> >>> https://stackoverflow.com/questions/72842846/asynchronous-api-calls-in-apache-beam >>> >>> >>> >>> >>> On Mon, 26 Aug 2024 at 01:40, Sofia’s World <[email protected]> wrote: >>> >>>> Not sure...I think I'll do a sample and post it on the list .. >>>> Thanks >>>> Marco >>>> >>>> On Sun, 25 Aug 2024, 14:46 XQ Hu via user, <[email protected]> >>>> wrote: >>>> >>>>> >>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Wait.html >>>>> >>>>> Is this something you are looking for? >>>>> >>>>> On Sun, Aug 25, 2024 at 6:21 AM Sofia’s World <[email protected]> >>>>> wrote: >>>>> >>>>>> HI all >>>>>> i want to write a pipeline where, as part of one of the steps, i >>>>>> will need to use >>>>>> an await call ,such as this one >>>>>> >>>>>> await fetcher.fetch_data(params, self.credentials) >>>>>> >>>>>> is Beam equipped for that? >>>>>> >>>>>> Does anyone have a sample to pass me for reference? >>>>>> >>>>>> Kind regards >>>>>> Marco >>>>>> >>>>>> >>>>>>
