Thanks Zhipeng.  Working as expected.  Thanks once again.

Saravanan

On Tue, Feb 15, 2022 at 3:23 AM Zhipeng Zhang <zhangzhipe...@gmail.com>
wrote:

> Hi Saravanan,
>
> One solution could be using a streamOperator to implement `BoundedOneInput`
> interface.
> An example code could be found here [1].
>
> [1]
> https://github.com/apache/flink-ml/blob/56b441d85c3356c0ffedeef9c27969aee5b3ecfc/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java#L75
>
> saravana...@gmail.com <saravana...@gmail.com> 于2022年2月15日周二 02:44写道:
>
>> Hi Niklas,
>>
>> Thanks for your reply.  Approach [1] works only if operators are chained
>> (in order words, operators executed within the same task).   Since
>> mapPartition operator parallelism is different from previous operator
>> parallelism, it doesn't fall under the same task(or not chained) .
>>
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#tasks-and-operator-chains
>> https://issues.apache.org/jira/browse/FLINK-14709
>>
>> Saravanan
>>
>> On Mon, Feb 14, 2022 at 9:01 AM Niklas Semmler <nik...@ververica.com>
>> wrote:
>>
>>> Hi Saravanan,
>>>
>>> AFAIK the last record is not treated differently.
>>>
>>> Does the approach in [1] not work?
>>>
>>> Best regards,
>>> Niklas
>>>
>>>
>>> https://github.com/dmvk/flink/blob/2f1b573cd57e95ecac13c8c57c0356fb281fd753/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java#L279
>>>
>>>
>>> > On 9. Feb 2022, at 20:31, saravana...@gmail.com <saravana...@gmail.com>
>>> wrote:
>>> >
>>> > Is there any way to identify the last message inside RichFunction in
>>> BATCH mode ?
>>> >
>>> >
>>> >
>>> > On Wed, Feb 9, 2022 at 8:56 AM saravana...@gmail.com <
>>> saravana...@gmail.com> wrote:
>>> > I am trying to migrate from Flink 1.12.x DataSet api to Flink 1.14.x
>>> DataStream api. mapPartition is not available in Flink DataStream.
>>> >
>>> > Current Code using Flink 1.12.x DataSet :
>>> >
>>> > dataset
>>> >     .<few operations>
>>> >     .mapPartition(new SomeMapParitionFn())
>>> >     .<few more operations>
>>> >
>>> > public static class SomeMapPartitionFn extends
>>> RichMapPartitionFunction<InputModel, OutputModel> {
>>> >
>>> >     @Override
>>> >     public void mapPartition(Iterable<InputModel> records,
>>> Collector<OutputModel> out) throws Exception {
>>> >         for (InputModel record : records) {
>>> >             /*
>>> >             do some operation
>>> >              */
>>> >             if (/* some condition based on processing *MULTIPLE*
>>> records */) {
>>> >
>>> >                 out.collect(...); // Conditional collect
>>>   ---> (1)
>>> >             }
>>> >         }
>>> >
>>> >         // At the end of the data, collect
>>> >
>>> >         out.collect(...);   // Collect processed data
>>>  ---> (2)
>>> >     }
>>> > }
>>> >
>>> >       • (1) - Collector.collect invoked based on some condition after
>>> processing few records
>>> >       • (2) - Collector.collect invoked at the end of data
>>> >
>>> > Initially we thought of using flatMap instead of mapPartition, but the
>>> collector is not available in close function.
>>> >
>>> > https://issues.apache.org/jira/browse/FLINK-14709 - Only available in
>>> case of chained drivers
>>> > How to implement this in Flink 1.14.x DataStream? Please advise...
>>> >
>>> > Note: Our application works with only finite set of data (Batch Mode)
>>> >
>>>
>>>
>
> --
> best,
> Zhipeng
>
>

Reply via email to