Hi Saravanan, One solution could be using a streamOperator to implement `BoundedOneInput` interface. An example code could be found here [1].
[1] https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75 saravana...@gmail.com <saravana...@gmail.com> 于2022年2月15日周二 02:44写道: > Hi Niklas, > > Thanks for your reply. Approach [1] works only if operators are chained > (in order words, operators executed within the same task). Since > mapPartition operator parallelism is different from previous operator > parallelism, it doesn't fall under the same task(or not chained) . > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains > https://issues.apache.org/jira/browse/FLINK-14709 > > Saravanan > > On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <nik...@ververica.com> > wrote: > >> Hi Saravanan, >> >> AFAIK the last record is not treated differently. >> >> Does the approach in [1] not work? >> >> Best regards, >> Niklas >> >> >> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279 >> >> >> > On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com> >> wrote: >> > >> > Is there any way to identify the last message inside RichFunction in >> BATCH mode ? >> > >> > >> > >> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com < >> saravana...@gmail.com> wrote: >> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x >> DataStream api. mapPartition is not available in Flink DataStream. >> > >> > Current Code using Flink 1.12.x DataSet : >> > >> > dataset >> > .<few operations> >> > .mapPartition(new SomeMapParitionFn()) >> > .<few more operations> >> > >> > public static class SomeMapPartitionFn extends >> RichMapPartitionFunction<InputModel, OutputModel> { >> > >> > @Override >> > public void mapPartition(Iterable<InputModel> records, >> Collector<OutputModel> out) throws Exception { >> > for (InputModel record : records) { >> > /* >> > do some operation >> > */ >> > if (/* some condition based on processing *MULTIPLE* >> records */) { >> > >> > out.collect(...); // Conditional collect >> ---> (1) >> > } >> > } >> > >> > // At the end of the data, collect >> > >> > out.collect(...); // Collect processed data >> ---> (2) >> > } >> > } >> > >> > • (1) - Collector.collect invoked based on some condition after >> processing few records >> > • (2) - Collector.collect invoked at the end of data >> > >> > Initially we thought of using flatMap instead of mapPartition, but the >> collector is not available in close function. >> > >> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in >> case of chained drivers >> > How to implement this in Flink 1.14.x DataStream? Please advise... >> > >> > Note: Our application works with only finite set of data (Batch Mode) >> > >> >> -- best, Zhipeng