sorry for missing a not. :( Whether the watermark, which is generated by the AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to the downstream is controlled by the framework. If an operator returns a watermark going back Flink would _*not_* send it to the downstream.
Best, Guowei Guowei Ma <guowei....@gmail.com> 于2019年4月15日周一 上午9:44写道: > Hi, Vijay > > >>>Then the Operator progresses to the next Watermark as a starting point > for events after event time reaches currWatermark ? > AFAIK, the operator that generates watermark is called by the frame work. > When the operator is called depends on the operator itself. For example the > operator that implements the AssignerWithPunctuatedWatermarks interface > would be called for every element. > > >>>How does it guarantee that watermark never goes backwards ? > Whether the watermark, which is generated by the > AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to > the downstream is controlled by the framework. If an operator returns a > watermark going back Flink would send it to the downstream. > > Best, > Guowei > > > Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 下午11:44写道: > >> Hi Guowei, >> Thx for your reply. >> I am trying to understand the logic behind the Point 1 i.e current >> Watermark being currMaxTimestamp minus the bound. >> So, does this mean the Operator processing a task has a current Event >> time < current Watermark < currMaxTimestamp ??? Then the Operator >> progresses to the next Watermark as a starting point for events after event >> time reaches currWatermark ? >> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java. >> >> // this guarantees that the watermark never goes backwards. >> long potentialWM = currentMaxTimestamp - maxOutOfOrderness; >> >> >> How does it guarantee that watermark never goes backwards ? >> >> TIA, >> >> Vijay >> >> >> >> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, >>> 1. From doc[1], A Watermark(t) declares that event time has reached time >>> t in that stream, meaning that there should be no more elements from the >>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal >>> to the watermark). So I think it might be counterintuitive that generating >>> a watermark, which is bigger than the timestamp of current element. At >>> least you should minus the bound. >>> 2. From the definition of watermark I think that watermark is not >>> related with the length of window. The bound is related to your application. >>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good >>> choice. Watermark is not free, you might send too many watermarks. If your >>> source could generate some "watermark" element I think you could use the >>> interface. You could choose AssignerWithPeriodicWatermarks. You can find >>> the example from doc[2]. >>> >>> 1. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks >>> 2. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators >>> Best, >>> Guowei >>> >>> >>> Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道: >>> >>>> Hi, >>>> I have created a TimestampAssigner as follows. >>>> I want to use monitoring.getEventTimestamp() with an Event Time >>>> processing and collected aggregated stats over time window intervals of 5 >>>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner >>>> with a bound ? I want to collect the stats for each eventTimestamp + window >>>> intervals. My understanding - *the generated watermark which is >>>> eventTimestamp + bound will collect all the eventTimestamp's which arrive >>>> within that Watermark inside each eventTimestamp + 5s etc window interval. >>>> Or does this bound have to be based on the windowInterval i.e >>>> extractedTimestamp + windowInterval + bound *?? >>>> >>>> >>>>> *public class MonitoringTSWAssigner implements >>>>> AssignerWithPunctuatedWatermarks<Monitoring> {* >>>>> * private long bound = 5 * (long) 1000; * >>>>> * public long extractTimestamp(Monitoring monitoring, long previousTS) >>>>> {* >>>>> * return monitoring.getEventTimestamp();** }* >>>>> >>>>> * public Watermark checkAndGetNextWatermark(Monitoring monitoring, >>>>> long extractedTimestamp) {* >>>>> * return new Watermark(extractedTimestamp + bound);//<==== >>>>> should it be - bound ?* >>>>> * }**}* >>>> >>>> >>>> Used here: >>>> >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> final DataStreamSource<Monitoring> monitoringDataStreamSource = >>>>> env.addSource(....); >>>>> DataStream<Monitoring> kinesisStream = >>>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new >>>>> MonitoringTSWAssigner()); >>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = >>>>> kinesisStream.keyBy("deployment", .....); >>>>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = >>>>> >>>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window >>>> >>>> >>>> TIA, >>>> >>>