You can use flatMap to flatten and have an asyncIO after it.

On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:

> Thanks Gen, I will look into customized Source and SpiltEnumerator.
>
> On Mon, Mar 7, 2022 at 10:20 PM Gen Luo <luogen...@gmail.com> wrote:
>
>> Hi Diwakar,
>>
>> An asynchronous flatmap function without the support of the framework can
>> be problematic. You should not call collector.collect outside the main
>> thread of the task, i.e. outside the flatMap method.
>>
>> I'd suggest using a customized Source instead to process the files, which
>> uses a SplitEnumerator to discover the files and SourceReaders to read the
>> files. In this way checkpoints can be triggered between two calls of
>> pollNext, so you don't have to implement it asynchronously. It would be
>> better if the readers read the lines and the records are enriched in a map
>> function following.
>>
>>
>>
>> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha <diwakar.n...@gmail.com>
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My
>>> use case is reading files from a s3 bucket, filter file contents ( say
>>> record) and enrich each record. Filter records and output to a sink.
>>> I'm reading 6k files per 15mints and the total number of records is 3
>>> billion/15mints. I'm using a flat map operator to convert the file into
>>> records and enrich records in a synchronous call.
>>>
>>> *Problem* : My application fails (Checkpoint timeout) to run if i add
>>> more filter criteria(operator). I suspect the system is not able to scale
>>> (CPU util as still 20%) because of the synchronous call. I want to convert
>>> this flat map to an asynchronous call using AsyncFunction. I was looking
>>> for something like an AsyncCollector.collect
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.html#collect-java.util.Collection->
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
>>> to complement my current synchronous implementation using flatmap but it
>>> seems like this is not available in Flink 1.11.
>>>
>>> *Question* :
>>> Could someone please help me with converting this flatmap operation to
>>> an asynchronous call?
>>>
>>> Please let me know if you have any questions.
>>>
>>> Best,
>>>
>>

Reply via email to