Hi,
1. From doc[1], A Watermark(t) declares that event time has reached time t
in that stream, meaning that there should be no more elements from the
stream with a timestamp t’ <= t (i.e. events with timestamps older or equal
to the watermark). So I think it might be counterintuitive that generating
a watermark, which is bigger than the timestamp of current element. At
least you should minus the bound.
2. From the definition of watermark I think that watermark is not related
with the length of window. The bound is related to your application.
3. In your case AssignerWithPunctuatedWatermarks might not be a good
choice. Watermark is not free, you might send too many watermarks. If your
source could generate some "watermark" element I think you could use the
interface. You could choose AssignerWithPeriodicWatermarks. You can find
the example from doc[2].

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators
Best,
Guowei


Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道:

> Hi,
> I have created a TimestampAssigner as follows.
> I want to use monitoring.getEventTimestamp() with an Event Time processing
> and collected aggregated stats over time window intervals of 5 secs, 5 mins
> etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
> ? I want to collect the stats for each eventTimestamp + window intervals.
> My understanding - *the generated watermark which is eventTimestamp +
> bound will collect all the eventTimestamp's which arrive within that
> Watermark inside each eventTimestamp + 5s etc window interval. Or does this
> bound have to be based on the windowInterval i.e extractedTimestamp +
> windowInterval + bound *??
>
>
>> *public class MonitoringTSWAssigner implements
>> AssignerWithPunctuatedWatermarks<Monitoring> {*
>> * private long bound = 5 * (long) 1000; *
>> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
>> *        return monitoring.getEventTimestamp();**    }*
>>
>> *    public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>> long extractedTimestamp) {*
>> *        return new Watermark(extractedTimestamp + bound);//<==== should
>> it be - bound ?*
>> *    }**}*
>
>
> Used here:
>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> final DataStreamSource<Monitoring> monitoringDataStreamSource =
>> env.addSource(....);
>> DataStream<Monitoring> kinesisStream =
>> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
>> MonitoringTSWAssigner());
>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
>> kinesisStream.keyBy("deployment", .....);
>> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>>
>> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window
>
>
> TIA,
>

Reply via email to