sorry for missing a not. :(
Whether the watermark, which is generated by the
AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
the downstream is controlled by the framework. If an operator returns a
watermark going back Flink would _*not_* send it to the downstream.


Best,
Guowei


Guowei Ma <guowei....@gmail.com> 于2019年4月15日周一 上午9:44写道:

> Hi, Vijay
>
> >>>Then the Operator progresses to the next Watermark as a starting point
> for events after event time reaches currWatermark ?
> AFAIK, the operator that generates watermark is called by the frame work.
> When the operator is called depends on the operator itself. For example the
> operator that implements the AssignerWithPunctuatedWatermarks interface
> would be called for every element.
>
> >>>How does it guarantee that watermark never goes backwards ?
> Whether the watermark, which is generated by the
> AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks is send to
> the downstream is controlled by the framework. If an operator returns a
> watermark going back Flink would send it to the downstream.
>
> Best,
> Guowei
>
>
> Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 下午11:44写道:
>
>> Hi Guowei,
>> Thx for your reply.
>> I am trying to understand the logic behind the Point 1 i.e current
>> Watermark being currMaxTimestamp minus the bound.
>> So, does this mean the Operator processing a task has a current Event
>> time < current Watermark < currMaxTimestamp ??? Then the Operator
>> progresses to the next Watermark as a starting point for events after event
>> time reaches currWatermark ?
>> Also, I saw this comment in BoundedOutOfOrdernessTimestampExtractor.java.
>>
>> // this guarantees that the watermark never goes backwards.
>> long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>>
>>
>> How does it guarantee that watermark never goes backwards ?
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Tue, Apr 9, 2019 at 10:50 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi,
>>> 1. From doc[1], A Watermark(t) declares that event time has reached time
>>> t in that stream, meaning that there should be no more elements from the
>>> stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
>>> to the watermark). So I think it might be counterintuitive that generating
>>> a watermark, which is bigger than the timestamp of current element. At
>>> least you should minus the bound.
>>> 2. From the definition of watermark I think that watermark is not
>>> related with the length of window. The bound is related to your application.
>>> 3. In your case AssignerWithPunctuatedWatermarks might not be a good
>>> choice. Watermark is not free, you might send too many watermarks. If your
>>> source could generate some "watermark" element I think you could use the
>>> interface. You could choose AssignerWithPeriodicWatermarks. You can find
>>> the example from doc[2].
>>>
>>> 1.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
>>> 2.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
>>> Best,
>>> Guowei
>>>
>>>
>>> Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道:
>>>
>>>> Hi,
>>>> I have created a TimestampAssigner as follows.
>>>> I want to use monitoring.getEventTimestamp() with an Event Time
>>>> processing and collected aggregated stats over time window intervals of 5
>>>> secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner
>>>> with a bound ? I want to collect the stats for each eventTimestamp + window
>>>> intervals. My understanding - *the generated watermark which is
>>>> eventTimestamp + bound will collect all the eventTimestamp's which arrive
>>>> within that Watermark inside each eventTimestamp + 5s etc window interval.
>>>> Or does this bound have to be based on the windowInterval i.e
>>>> extractedTimestamp + windowInterval + bound *??
>>>>
>>>>
>>>>> *public class MonitoringTSWAssigner implements
>>>>> AssignerWithPunctuatedWatermarks<Monitoring> {*
>>>>> * private long bound = 5 * (long) 1000; *
>>>>> * public long extractTimestamp(Monitoring monitoring, long previousTS)
>>>>> {*
>>>>> *        return monitoring.getEventTimestamp();**    }*
>>>>>
>>>>> *    public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>>>> long extractedTimestamp) {*
>>>>> *        return new Watermark(extractedTimestamp + bound);//<====
>>>>> should it be - bound ?*
>>>>> *    }**}*
>>>>
>>>>
>>>> Used here:
>>>>
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>> final DataStreamSource<Monitoring> monitoringDataStreamSource =
>>>>> env.addSource(....);
>>>>> DataStream<Monitoring> kinesisStream =
>>>>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>>>>> MonitoringTSWAssigner());
>>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
>>>>> kinesisStream.keyBy("deployment", .....);
>>>>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>>>>>
>>>>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>>>>
>>>>
>>>> TIA,
>>>>
>>>

Reply via email to