Hi, After looking at the code in EventTimeTrigger, I changed the Watermark to be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS was <= watermark. I was able to consumer from Kinesis when I had only 50 records.
For TumblingWindow of 5 secs , the window maxTS was usually like around currTime + 5 secs. So, I set the watermark to System.currentMillisecs + 5 secs. This way, the trigger fired and got into the AggregateFunction.getResult(). @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This check had to be met // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Thx for your reply and pointers on the currentLowWatermark. Looks like the > Flink UI has tab for Watermarks itself for an Operator. > > I dump 5 records into the Kinesis Data Stream and am trying to read the > same record from the FlinkKinesisConsumer and am not able to. > I am using the same monitoring.getIntervalStart() in the Watermark > generation(intervalStart - bound) in *MonitoringAssigner* class that I > used to generate data on the producer side. I generate intervalStart on the > Producer side which increments on each record by 3-10 millisecs. The > watermark is being generated with intervalStart - bound(3 secs)-so, every > watermark generated is > than the previous one. So, why does it not push > data out ? It gets into the MGroupingWindowAggregate.add(..) method but > never gets into the MGroupingWindowAggregate.getResult(..) method ?? It > works when i produce 1000 records or so into Kinesis data stream. > > Here is a gist of my code- > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //FlinkConsumer > Properties kinesisConsumerConfig = new Properties(); > ...... > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, > "10000"); > kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, > "2000");//2000 > kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name()); > FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>( > kinesisTopicRead, new MonitoringMapKinesisSchema(), > kinesisConsumerConfig); > final DataStreamSource<Monitoring> monitoringDataStreamSource = > env.addSource(kinesisConsumer); > DataStream<Monitoring> kinesisStream = > monitoringDataStreamSource.assignTimestampsAndWatermarks(new > *MonitoringAssigner*(3000));//code at bottom > > org.apache.flink.streaming.api.windowing.time.Time timeWindow = > Time.seconds(5); > final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = > kinesisStream.timeWindow(timeWindow); > DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 = > windowStream.*aggregate*( > new *MGroupingWindowAggregate*(....),//AggregateFunction > impl > new *MGroupingAggregateWindowProcessing*(...)); > > public class MonitoringAssigner implements > AssignerWithPunctuatedWatermarks<Monitoring> { > private long *bound = 3 * 1000*;//3 secs out of order bound in > millisecs > public MonitoringAssigner(long bound) { > this.bound = bound; > } > public Watermark checkAndGetNextWatermark(Monitoring monitoring, long > extractedTimestamp) { > long nextWatermark = extractedTimestamp - bound; > return new Watermark(nextWatermark); > } > public long extractTimestamp(Monitoring monitoring, long previousTS) { > LocalDateTime intervalStart = Utils.getLocalDateTime( > *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057 > long extractedTS = Utils.getLongFromLocalDateTime(intervalStart); > return extractedTS; > //return System.currentTimeMillis(); //this works fine. > } > } > > TIA, > Vijay > > On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Vijay, >> >> Could you provide more information about your problem? For example >> - Which kind of window do you use? >> - What's the window size? >> - A relatively complete code is better :-) >> >> As for the problem, it is probably the event time has not reached the end >> of the window. You can monitor the watermark in the web dashboard[1]. >> Also, changing even time to processing time is another way to verify if >> it is a watermark problem. >> >> Best, Hequn >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html >> >> >> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> Observations on Watermarks: >>> Read this great article: >>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy >>> >>> * Watermark means when for any event TS, when to stop waiting for >>> arrival of earlier events. >>> * Watermark t means all events with Timestamp < t have already arrived. >>> * When to push data out - When watermark with TS >= t arrives >>> >>> Only *using incrementing current time for watermark seems to be working >>> correctly* but not sure if it aligns up correctly with EventTime >>> processing. >>> *Using the incoming records intervalStart as the Watermark source for >>> EventTime causes data to not be pushed at all* in cases when i have >>> just 5 records in the Source. >>> >>> My source generation for intervalStart has intervalStart incrementing at >>> a regular interval. >>> I tried using the intervalStart for my Watermark with a out of order >>> late boundedness of 3 secs. >>> The *AggregateFunction* I am using calls the add() fine but *never >>> calls the getResult().* >>> My assumption was that the AggregateFunction I am using would push the >>> data to getResult >>> based on the Watermark based on intervalStart incrementing beyong the >>> previous watermark t. >>> But it doesn't -is it because I have limited number of input records and >>> once intervalStart gets to the end >>> of the input records too fast, it stops incrementing the watermar and >>> hence doesn't push data ? >>> >>> With System.currentTimeMillis, it happily keeps increasing and hence >>> pushes the data. >>> >>> Created this class: >>> public class MonitoringAssigner implements >>> AssignerWithPunctuatedWatermarks<Monitoring> { >>> private long bound = 3 * 1000;//3 secs out of order bound in >>> millisecs >>> >>> public MonitoringAssigner(long bound) { >>> this.bound = bound; >>> } >>> public Watermark checkAndGetNextWatermark(Monitoring monitoring, >>> long extractedTimestamp) { >>> long nextWatermark = extractedTimestamp - bound; >>> //simply emit a Watermark with every event >>> return new Watermark(nextWatermark); >>> } >>> >>> @Override >>> public long extractTimestamp(Monitoring monitoring, long previousTS) >>> { >>> /*LocalDateTime intervalStart = >>> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 >>> 02:21:06.057 >>> long extractedTS = >>> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped >>> pushing recs after a certain time* >>> return extractedTS;*/ >>> return *System.currentTimeMillis*();//incrementing current time >>> >>> } >>> >>>