Hi Till, Thank you for your quick response. Both the AssignerWithPeriodicWatermarks and WatermarkStrategy I am using are very simple ones.
*Code for AssignerWithPeriodicWatermarks:* public class CustomEventTimeWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyPojo> { private final long maxOutOfOrderness = 0; private long currentMaxTimestamp; @Override public long extractTimestamp(MyPojo myPojo, long previousTimestamp) { long timestamp = myPojo.getInitiationTime().toEpochMilli(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } *Code for WatermarkStrategy :* WatermarkStrategy<MyPojo> watermarkStrategy = WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0)) .withTimestampAssigner((event, timestamp) -> event.getInitiationTime().toEpochMilli()); Thanks & regards, Arti On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Arti, > > thanks for sharing this feedback with us. The WatermarkStrategy has been > introduced quite recently and might have some rough edges. I am pulling in > Aljoscha and Klou who have worked on this feature and might be able to help > you. For better understanding your problem, it would be great if you could > share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us. > > For the file source, the Flink community has recently introduced a new > source abstraction which will also support checkpoints for file sources > once the file source connector has been migrated to the new interfaces. The > community is currently working on it. > > Cheers, > Till > > On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com> wrote: > >> Hi, >> >> When migrating Stream API based Flink application from 1.9.2 to 1.11.1 >> the watermark generation has issues with file source alone. It works well >> with Kafka source. >> >> With 1.9.2 a custom watermark generator implementation of >> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is >> deprecated and to be replaced with WatermarkStrategy (that combines both >> WatermarkGenerator and TimestampAssigner). >> >> With Flink 1.11.1 when using Kafka source both the above options (i.e. >> old AssignerWithPeriodicWatermarks and new WatermarkStrategy) work >> perfectly well but with file source none of them works. The watermark >> assigner never increments the watermarks resulting in stateful operators >> not clearing their state ever, leading to erroneous results and >> continuously increasing memory usage. >> >> Same code works well with Kafka source. Is this a known issue? If so, any >> fix planned shortly? >> >> A side note (and probably a candidate for separate email, but I will >> write it here) even checkpoints do not work with File Source since 1.9.2 >> and it is still the problem with 1.11.1. Just wondering if File source with >> stream API is not a priority in Flink development? If so we can rethink our >> sources. >> >> Thanks & regards, >> Arti >> >