Hi, Thx for your reply and pointers on the currentLowWatermark. Looks like the Flink UI has tab for Watermarks itself for an Operator.
I dump 5 records into the Kinesis Data Stream and am trying to read the same record from the FlinkKinesisConsumer and am not able to. I am using the same monitoring.getIntervalStart() in the Watermark generation(intervalStart - bound) in *MonitoringAssigner* class that I used to generate data on the producer side. I generate intervalStart on the Producer side which increments on each record by 3-10 millisecs. The watermark is being generated with intervalStart - bound(3 secs)-so, every watermark generated is > than the previous one. So, why does it not push data out ? It gets into the MGroupingWindowAggregate.add(..) method but never gets into the MGroupingWindowAggregate.getResult(..) method ?? It works when i produce 1000 records or so into Kinesis data stream. Here is a gist of my code- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //FlinkConsumer Properties kinesisConsumerConfig = new Properties(); ...... kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000"); kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000");//2000 kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name()); FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>( kinesisTopicRead, new MonitoringMapKinesisSchema(), kinesisConsumerConfig); final DataStreamSource<Monitoring> monitoringDataStreamSource = env.addSource(kinesisConsumer); DataStream<Monitoring> kinesisStream = monitoringDataStreamSource.assignTimestampsAndWatermarks(new *MonitoringAssigner*(3000));//code at bottom org.apache.flink.streaming.api.windowing.time.Time timeWindow = Time.seconds(5); final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = kinesisStream.timeWindow(timeWindow); DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = windowStream.*aggregate*( new *MGroupingWindowAggregate*(....),//AggregateFunction impl new *MGroupingAggregateWindowProcessing*(...)); public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> { private long *bound = 3 * 1000*;//3 secs out of order bound in millisecs public MonitoringAssigner(long bound) { this.bound = bound; } public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) { long nextWatermark = extractedTimestamp - bound; return new Watermark(nextWatermark); } public long extractTimestamp(Monitoring monitoring, long previousTS) { LocalDateTime intervalStart = Utils.getLocalDateTime( *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057 long extractedTS = Utils.getLongFromLocalDateTime(intervalStart); return extractedTS; //return System.currentTimeMillis(); //this works fine. } } TIA, Vijay On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Vijay, > > Could you provide more information about your problem? For example > - Which kind of window do you use? > - What's the window size? > - A relatively complete code is better :-) > > As for the problem, it is probably the event time has not reached the end > of the window. You can monitor the watermark in the web dashboard[1]. > Also, changing even time to processing time is another way to verify if it > is a watermark problem. > > Best, Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html > > > On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> Observations on Watermarks: >> Read this great article: >> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy >> >> * Watermark means when for any event TS, when to stop waiting for arrival >> of earlier events. >> * Watermark t means all events with Timestamp < t have already arrived. >> * When to push data out - When watermark with TS >= t arrives >> >> Only *using incrementing current time for watermark seems to be working >> correctly* but not sure if it aligns up correctly with EventTime >> processing. >> *Using the incoming records intervalStart as the Watermark source for >> EventTime causes data to not be pushed at all* in cases when i have just >> 5 records in the Source. >> >> My source generation for intervalStart has intervalStart incrementing at >> a regular interval. >> I tried using the intervalStart for my Watermark with a out of order late >> boundedness of 3 secs. >> The *AggregateFunction* I am using calls the add() fine but *never calls >> the getResult().* >> My assumption was that the AggregateFunction I am using would push the >> data to getResult >> based on the Watermark based on intervalStart incrementing beyong the >> previous watermark t. >> But it doesn't -is it because I have limited number of input records and >> once intervalStart gets to the end >> of the input records too fast, it stops incrementing the watermar and >> hence doesn't push data ? >> >> With System.currentTimeMillis, it happily keeps increasing and hence >> pushes the data. >> >> Created this class: >> public class MonitoringAssigner implements >> AssignerWithPunctuatedWatermarks<Monitoring> { >> private long bound = 3 * 1000;//3 secs out of order bound in millisecs >> >> public MonitoringAssigner(long bound) { >> this.bound = bound; >> } >> public Watermark checkAndGetNextWatermark(Monitoring monitoring, long >> extractedTimestamp) { >> long nextWatermark = extractedTimestamp - bound; >> //simply emit a Watermark with every event >> return new Watermark(nextWatermark); >> } >> >> @Override >> public long extractTimestamp(Monitoring monitoring, long previousTS) { >> /*LocalDateTime intervalStart = >> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 >> 02:21:06.057 >> long extractedTS = >> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped >> pushing recs after a certain time* >> return extractedTS;*/ >> return *System.currentTimeMillis*();//incrementing current time >> >> } >> >>