Hello
 thanks all
 have tried this simple test case - before checking URL above - but did not
work out

async def process_data(element):
    # Asynchronous processing logic
    await asyncio.sleep(5)  # Simulate an asynchronous operation
    return element


class AsyncProcess(beam.DoFn):

    def __init__(self):
        self.loop = asyncio.new_event_loop()
    def process(self, element):
        async def process_element_async():
            return await process_data(element)

        return self.loop.run_until_complete(process_element_async())


when i put this in a pipeline  (debugSink is simply  beam.Map(logging.info)

with TestPipeline(options=PipelineOptions()) as p:
    input = (p | 'Start' >> beam.Create(['AAPL'])
             | 'Run Loader' >> beam.ParDo(AsyncProcess())
             | self.debugSink
             )

i end up getting this.. which somehow i was expecting

self = <socket.socket fd=2136, family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 51510),
raddr=('127.0.0.1', 51511)>

    def __getstate__(self):
>       raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
E       TypeError: cannot pickle 'socket' object


The usecase i have is that i have to call an API which under the hood uses
async api

data = await fetcher.fetch_data(params, credentials)



so on my side, not sure which options i have  (bear in mind i am also an
asyncio  noob)

Thanks and regards
Marco








On Sun, Aug 25, 2024 at 11:24 PM Jaehyeon Kim <[email protected]> wrote:

> I guess there are multiple options.
>
> The easiest one would be converting the async method to a sync method or
> creating a wrapper method for doing so.
>
> Also, the following stackoverflow post introduces another two options you
> may find useful.
>
>    - asynchronous API calls in apache beam -
>    
> https://stackoverflow.com/questions/72842846/asynchronous-api-calls-in-apache-beam
>
>
>
>
> On Mon, 26 Aug 2024 at 01:40, Sofia’s World <[email protected]> wrote:
>
>> Not sure...I think I'll do a sample  and post it on the list ..
>> Thanks
>>  Marco
>>
>> On Sun, 25 Aug 2024, 14:46 XQ Hu via user, <[email protected]> wrote:
>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Wait.html
>>>
>>> Is this something you are looking for?
>>>
>>> On Sun, Aug 25, 2024 at 6:21 AM Sofia’s World <[email protected]>
>>> wrote:
>>>
>>>> HI all
>>>>   i want to write a pipeline where, as part of one of the steps, i will
>>>> need to use
>>>> an await  call  ,such as this one
>>>>
>>>> await fetcher.fetch_data(params, self.credentials)
>>>>
>>>> is Beam equipped for that?
>>>>
>>>> Does anyone have a sample to pass me for reference?
>>>>
>>>> Kind regards
>>>>  Marco
>>>>
>>>>
>>>>

Reply via email to