I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
DataStream api. mapPartition is not available in Flink DataStream.
*Current Code using Flink 1.12.x DataSet :*

dataset
    .<few operations>
    .mapPartition(new SomeMapParitionFn())
    .<few more operations>

public static class SomeMapPartitionFn extends
RichMapPartitionFunction<InputModel, OutputModel> {

    @Override
    public void mapPartition(Iterable<InputModel> records,
Collector<OutputModel> out) throws Exception {
        for (InputModel record : records) {
            /*
            do some operation
             */
            if (/* some condition based on processing *MULTIPLE*
records */) {*                out.collect(...); // Conditional collect
               ---> (1)*            }
        }

        // At the end of the data, collect*        out.collect(...);
// Collect processed data                   ---> (2) *    }
}


   -

   (1) - Collector.collect invoked based on some condition after processing
   few records
   -

   (2) - Collector.collect invoked at the end of data

   Initially we thought of using flatMap instead of mapPartition, but the
   collector is not available in close function.

   https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
   case of chained drivers

How to implement this in Flink 1.14.x DataStream? Please advise...

*Note*: Our application works with only finite set of data (Batch Mode)

Reply via email to