Hi Till,

Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks<MyPojo> {

    private final long maxOutOfOrderness = 0;
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
        long timestamp = myPojo.getInitiationTime().toEpochMilli();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}


*Code for WatermarkStrategy :*

WatermarkStrategy<MyPojo> watermarkStrategy =
        WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
                .withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Arti,
>
> thanks for sharing this feedback with us. The WatermarkStrategy has been
> introduced quite recently and might have some rough edges. I am pulling in
> Aljoscha and Klou who have worked on this feature and might be able to help
> you. For better understanding your problem, it would be great if you could
> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
>
> For the file source, the Flink community has recently introduced a new
> source abstraction which will also support checkpoints for file sources
> once the file source connector has been migrated to the new interfaces. The
> community is currently working on it.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com> wrote:
>
>> Hi,
>>
>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>> the watermark generation has issues with file source alone. It works well
>> with Kafka source.
>>
>> With 1.9.2 a custom watermark generator implementation of
>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>> deprecated and to be replaced with WatermarkStrategy (that combines both
>> WatermarkGenerator and TimestampAssigner).
>>
>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>> perfectly well but with file source none of them works. The watermark
>> assigner never increments the watermarks resulting in stateful operators
>> not clearing their state ever, leading to erroneous results and
>> continuously increasing memory usage.
>>
>> Same code works well with Kafka source. Is this a known issue? If so, any
>> fix planned shortly?
>>
>> A side note (and probably a candidate for separate email, but I will
>> write it here) even checkpoints do not work with File Source since 1.9.2
>> and it is still the problem with 1.11.1. Just wondering if File source with
>> stream API is not a priority in Flink development? If so we can rethink our
>> sources.
>>
>> Thanks & regards,
>> Arti
>>
>

Reply via email to