Hello
thanks all
have tried this simple test case - before checking URL above - but did not
work out
async def process_data(element):
# Asynchronous processing logic
await asyncio.sleep(5) # Simulate an asynchronous operation
return element
class AsyncProcess(beam.DoFn):
def __init__(self):
self.loop = asyncio.new_event_loop()
def process(self, element):
async def process_element_async():
return await process_data(element)
return self.loop.run_until_complete(process_element_async())
when i put this in a pipeline (debugSink is simply beam.Map(logging.info)
with TestPipeline(options=PipelineOptions()) as p:
input = (p | 'Start' >> beam.Create(['AAPL'])
| 'Run Loader' >> beam.ParDo(AsyncProcess())
| self.debugSink
)
i end up getting this.. which somehow i was expecting
self = <socket.socket fd=2136, family=AddressFamily.AF_INET,
type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 51510),
raddr=('127.0.0.1', 51511)>
def __getstate__(self):
> raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
E TypeError: cannot pickle 'socket' object
The usecase i have is that i have to call an API which under the hood uses
async api
data = await fetcher.fetch_data(params, credentials)
so on my side, not sure which options i have (bear in mind i am also an
asyncio noob)
Thanks and regards
Marco
On Sun, Aug 25, 2024 at 11:24 PM Jaehyeon Kim <[email protected]> wrote:
> I guess there are multiple options.
>
> The easiest one would be converting the async method to a sync method or
> creating a wrapper method for doing so.
>
> Also, the following stackoverflow post introduces another two options you
> may find useful.
>
> - asynchronous API calls in apache beam -
>
> https://stackoverflow.com/questions/72842846/asynchronous-api-calls-in-apache-beam
>
>
>
>
> On Mon, 26 Aug 2024 at 01:40, Sofia’s World <[email protected]> wrote:
>
>> Not sure...I think I'll do a sample and post it on the list ..
>> Thanks
>> Marco
>>
>> On Sun, 25 Aug 2024, 14:46 XQ Hu via user, <[email protected]> wrote:
>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Wait.html
>>>
>>> Is this something you are looking for?
>>>
>>> On Sun, Aug 25, 2024 at 6:21 AM Sofia’s World <[email protected]>
>>> wrote:
>>>
>>>> HI all
>>>> i want to write a pipeline where, as part of one of the steps, i will
>>>> need to use
>>>> an await call ,such as this one
>>>>
>>>> await fetcher.fetch_data(params, self.credentials)
>>>>
>>>> is Beam equipped for that?
>>>>
>>>> Does anyone have a sample to pass me for reference?
>>>>
>>>> Kind regards
>>>> Marco
>>>>
>>>>
>>>>