You can use flatMap to flatten and have an asyncIO after it. On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:
> Thanks Gen, I will look into customized Source and SpiltEnumerator. > > On Mon, Mar 7, 2022 at 10:20 PM Gen Luo <luogen...@gmail.com> wrote: > >> Hi Diwakar, >> >> An asynchronous flatmap function without the support of the framework can >> be problematic. You should not call collector.collect outside the main >> thread of the task, i.e. outside the flatMap method. >> >> I'd suggest using a customized Source instead to process the files, which >> uses a SplitEnumerator to discover the files and SourceReaders to read the >> files. In this way checkpoints can be triggered between two calls of >> pollNext, so you don't have to implement it asynchronously. It would be >> better if the readers read the lines and the records are enriched in a map >> function following. >> >> >> >> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha <diwakar.n...@gmail.com> >> wrote: >> >>> Hello Everyone, >>> >>> I'm running a streaming application using Flink 1.11 and EMR 6.01. My >>> use case is reading files from a s3 bucket, filter file contents ( say >>> record) and enrich each record. Filter records and output to a sink. >>> I'm reading 6k files per 15mints and the total number of records is 3 >>> billion/15mints. I'm using a flat map operator to convert the file into >>> records and enrich records in a synchronous call. >>> >>> *Problem* : My application fails (Checkpoint timeout) to run if i add >>> more filter criteria(operator). I suspect the system is not able to scale >>> (CPU util as still 20%) because of the synchronous call. I want to convert >>> this flat map to an asynchronous call using AsyncFunction. I was looking >>> for something like an AsyncCollector.collect >>> <https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.html#collect-java.util.Collection-> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html >>> to complement my current synchronous implementation using flatmap but it >>> seems like this is not available in Flink 1.11. >>> >>> *Question* : >>> Could someone please help me with converting this flatmap operation to >>> an asynchronous call? >>> >>> Please let me know if you have any questions. >>> >>> Best, >>> >>