I'm not able to use asyncIO because the file will not fit in memory. I
thought that flatmap will allow me to enrich/process records while
downloading instead of waiting for the whole file to get downloaded. The
solution works but its not scalable because i'm not able to use
AsynFunction in Flatmap.

On Wed, Mar 9, 2022 at 4:52 AM Arvid Heise <ar...@apache.org> wrote:

> You can use flatMap to flatten and have an asyncIO after it.
>
> On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:
>
>> Thanks Gen, I will look into customized Source and SpiltEnumerator.
>>
>> On Mon, Mar 7, 2022 at 10:20 PM Gen Luo <luogen...@gmail.com> wrote:
>>
>>> Hi Diwakar,
>>>
>>> An asynchronous flatmap function without the support of the framework
>>> can be problematic. You should not call collector.collect outside the main
>>> thread of the task, i.e. outside the flatMap method.
>>>
>>> I'd suggest using a customized Source instead to process the files,
>>> which uses a SplitEnumerator to discover the files and SourceReaders to
>>> read the files. In this way checkpoints can be triggered between two calls
>>> of pollNext, so you don't have to implement it asynchronously. It would be
>>> better if the readers read the lines and the records are enriched in a map
>>> function following.
>>>
>>>
>>>
>>> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha <diwakar.n...@gmail.com>
>>> wrote:
>>>
>>>> Hello Everyone,
>>>>
>>>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My
>>>> use case is reading files from a s3 bucket, filter file contents ( say
>>>> record) and enrich each record. Filter records and output to a sink.
>>>> I'm reading 6k files per 15mints and the total number of records is 3
>>>> billion/15mints. I'm using a flat map operator to convert the file into
>>>> records and enrich records in a synchronous call.
>>>>
>>>> *Problem* : My application fails (Checkpoint timeout) to run if i add
>>>> more filter criteria(operator). I suspect the system is not able to scale
>>>> (CPU util as still 20%) because of the synchronous call. I want to convert
>>>> this flat map to an asynchronous call using AsyncFunction. I was looking
>>>> for something like an AsyncCollector.collect
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.html#collect-java.util.Collection->
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
>>>> to complement my current synchronous implementation using flatmap but it
>>>> seems like this is not available in Flink 1.11.
>>>>
>>>> *Question* :
>>>> Could someone please help me with converting this flatmap operation to
>>>> an asynchronous call?
>>>>
>>>> Please let me know if you have any questions.
>>>>
>>>> Best,
>>>>
>>>

Reply via email to