Thanks Zhipeng. Working as expected. Thanks once again. Saravanan
On Tue, Feb 15, 2022 at 3:23 AM Zhipeng Zhang <zhangzhipe...@gmail.com> wrote: > Hi Saravanan, > > One solution could be using a streamOperator to implement `BoundedOneInput` > interface. > An example code could be found here [1]. > > [1] > https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75 > > saravana...@gmail.com <saravana...@gmail.com> 于2022年2月15日周二 02:44写道: > >> Hi Niklas, >> >> Thanks for your reply. Approach [1] works only if operators are chained >> (in order words, operators executed within the same task). Since >> mapPartition operator parallelism is different from previous operator >> parallelism, it doesn't fall under the same task(or not chained) . >> >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains >> https://issues.apache.org/jira/browse/FLINK-14709 >> >> Saravanan >> >> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <nik...@ververica.com> >> wrote: >> >>> Hi Saravanan, >>> >>> AFAIK the last record is not treated differently. >>> >>> Does the approach in [1] not work? >>> >>> Best regards, >>> Niklas >>> >>> >>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279 >>> >>> >>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com> >>> wrote: >>> > >>> > Is there any way to identify the last message inside RichFunction in >>> BATCH mode ? >>> > >>> > >>> > >>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com < >>> saravana...@gmail.com> wrote: >>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x >>> DataStream api. mapPartition is not available in Flink DataStream. >>> > >>> > Current Code using Flink 1.12.x DataSet : >>> > >>> > dataset >>> > .<few operations> >>> > .mapPartition(new SomeMapParitionFn()) >>> > .<few more operations> >>> > >>> > public static class SomeMapPartitionFn extends >>> RichMapPartitionFunction<InputModel, OutputModel> { >>> > >>> > @Override >>> > public void mapPartition(Iterable<InputModel> records, >>> Collector<OutputModel> out) throws Exception { >>> > for (InputModel record : records) { >>> > /* >>> > do some operation >>> > */ >>> > if (/* some condition based on processing *MULTIPLE* >>> records */) { >>> > >>> > out.collect(...); // Conditional collect >>> ---> (1) >>> > } >>> > } >>> > >>> > // At the end of the data, collect >>> > >>> > out.collect(...); // Collect processed data >>> ---> (2) >>> > } >>> > } >>> > >>> > • (1) - Collector.collect invoked based on some condition after >>> processing few records >>> > • (2) - Collector.collect invoked at the end of data >>> > >>> > Initially we thought of using flatMap instead of mapPartition, but the >>> collector is not available in close function. >>> > >>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in >>> case of chained drivers >>> > How to implement this in Flink 1.14.x DataStream? Please advise... >>> > >>> > Note: Our application works with only finite set of data (Batch Mode) >>> > >>> >>> > > -- > best, > Zhipeng > >