Hi Saravanan,

One solution could be using a streamOperator to implement `BoundedOneInput`
interface.
An example code could be found here [1].

[1]
https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75

saravana...@gmail.com <saravana...@gmail.com> 于2022年2月15日周二 02:44写道:

> Hi Niklas,
>
> Thanks for your reply.  Approach [1] works only if operators are chained
> (in order words, operators executed within the same task).   Since
> mapPartition operator parallelism is different from previous operator
> parallelism, it doesn't fall under the same task(or not chained) .
>
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
> https://issues.apache.org/jira/browse/FLINK-14709
>
> Saravanan
>
> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <nik...@ververica.com>
> wrote:
>
>> Hi Saravanan,
>>
>> AFAIK the last record is not treated differently.
>>
>> Does the approach in [1] not work?
>>
>> Best regards,
>> Niklas
>>
>>
>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>
>>
>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com>
>> wrote:
>> >
>> > Is there any way to identify the last message inside RichFunction in
>> BATCH mode ?
>> >
>> >
>> >
>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>> saravana...@gmail.com> wrote:
>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>> DataStream api. mapPartition is not available in Flink DataStream.
>> >
>> > Current Code using Flink 1.12.x DataSet :
>> >
>> > dataset
>> >     .<few operations>
>> >     .mapPartition(new SomeMapParitionFn())
>> >     .<few more operations>
>> >
>> > public static class SomeMapPartitionFn extends
>> RichMapPartitionFunction<InputModel, OutputModel> {
>> >
>> >     @Override
>> >     public void mapPartition(Iterable<InputModel> records,
>> Collector<OutputModel> out) throws Exception {
>> >         for (InputModel record : records) {
>> >             /*
>> >             do some operation
>> >              */
>> >             if (/* some condition based on processing *MULTIPLE*
>> records */) {
>> >
>> >                 out.collect(...); // Conditional collect
>> ---> (1)
>> >             }
>> >         }
>> >
>> >         // At the end of the data, collect
>> >
>> >         out.collect(...);   // Collect processed data
>>  ---> (2)
>> >     }
>> > }
>> >
>> >       • (1) - Collector.collect invoked based on some condition after
>> processing few records
>> >       • (2) - Collector.collect invoked at the end of data
>> >
>> > Initially we thought of using flatMap instead of mapPartition, but the
>> collector is not available in close function.
>> >
>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>> case of chained drivers
>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>> >
>> > Note: Our application works with only finite set of data (Batch Mode)
>> >
>>
>>

-- 
best,
Zhipeng

Reply via email to