I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x DataStream api. mapPartition is not available in Flink DataStream. *Current Code using Flink 1.12.x DataSet :*
dataset .<few operations> .mapPartition(new SomeMapParitionFn()) .<few more operations> public static class SomeMapPartitionFn extends RichMapPartitionFunction<InputModel, OutputModel> { @Override public void mapPartition(Iterable<InputModel> records, Collector<OutputModel> out) throws Exception { for (InputModel record : records) { /* do some operation */ if (/* some condition based on processing *MULTIPLE* records */) {* out.collect(...); // Conditional collect ---> (1)* } } // At the end of the data, collect* out.collect(...); // Collect processed data ---> (2) * } } - (1) - Collector.collect invoked based on some condition after processing few records - (2) - Collector.collect invoked at the end of data Initially we thought of using flatMap instead of mapPartition, but the collector is not available in close function. https://issues.apache.org/jira/browse/FLINK-14709 - Only available in case of chained drivers How to implement this in Flink 1.14.x DataStream? Please advise... *Note*: Our application works with only finite set of data (Batch Mode)