Hi,
After looking at the code in EventTimeTrigger, I changed the Watermark to
be System.currentMillisecs + boundSecs( 5 secs) so that the window's maxTS
was <= watermark. I was able to consumer from Kinesis when I had only 50
records.

For TumblingWindow of 5 secs , the window maxTS was usually like around
currTime + 5 secs.
So, I set the watermark to System.currentMillisecs + 5 secs.
This way, the trigger fired and got into the AggregateFunction.getResult().

@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {//<== This
check had to be met
      // if the watermark is already past the window fire immediately
      return TriggerResult.FIRE;
   } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
   }
}


On Mon, Dec 17, 2018 at 10:00 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Thx for your reply and pointers on the currentLowWatermark. Looks like the
> Flink UI has tab for Watermarks itself for an Operator.
>
> I dump 5 records into the Kinesis Data Stream and am trying to read the
> same record from the FlinkKinesisConsumer and am not able to.
> I am using the same monitoring.getIntervalStart() in the Watermark
> generation(intervalStart - bound) in *MonitoringAssigner* class that I
> used to generate data on the producer side. I generate intervalStart on the
> Producer side which increments on each record by 3-10 millisecs. The
> watermark is being generated with intervalStart - bound(3 secs)-so, every
> watermark generated is > than the previous one. So, why does it not push
> data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
> never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
> works when i produce 1000 records or so into Kinesis data stream.
>
> Here is a gist of my code-
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //FlinkConsumer
> Properties kinesisConsumerConfig = new Properties();
>         ......
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
> "10000");
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> "2000");//2000
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
> FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>(
>                 kinesisTopicRead, new MonitoringMapKinesisSchema(),
> kinesisConsumerConfig);
> final DataStreamSource<Monitoring> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> DataStream<Monitoring> kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> *MonitoringAssigner*(3000));//code at bottom
>
> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> Time.seconds(5);
> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
>                 kinesisStream.timeWindow(timeWindow);
> DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
> windowStream.*aggregate*(
>                 new *MGroupingWindowAggregate*(....),//AggregateFunction
> impl
>                 new *MGroupingAggregateWindowProcessing*(...));
>
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {
>     private long *bound = 3 * 1000*;//3 secs out of order bound in
> millisecs
> public MonitoringAssigner(long bound) {
>         this.bound = bound;
>     }
>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
>         long nextWatermark = extractedTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>         LocalDateTime intervalStart = Utils.getLocalDateTime(
> *monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
>         long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
>         return extractedTS;
>         //return System.currentTimeMillis(); //this works fine.
>     }
> }
>
> TIA,
> Vijay
>
> On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> Could you provide more information about your problem? For example
>> - Which kind of window do you use?
>> - What's the window size?
>> - A relatively complete code is better :-)
>>
>> As for the problem, it is probably the event time has not reached the end
>> of the window. You can monitor the watermark in the web dashboard[1].
>> Also, changing even time to processing time is another way to verify if
>> it is a watermark problem.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>>
>>
>> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Observations on Watermarks:
>>> Read this great article:
>>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>>>
>>> * Watermark means when for any event TS, when to stop waiting for
>>> arrival of earlier events.
>>> * Watermark t means all events with Timestamp < t have already arrived.
>>> * When to push data out - When watermark with TS >= t arrives
>>>
>>> Only *using incrementing current time for watermark seems to be working
>>> correctly* but not sure if it aligns up correctly with EventTime
>>> processing.
>>> *Using the incoming records intervalStart as the Watermark source  for
>>> EventTime causes data to not be pushed at all* in cases when i have
>>> just 5 records in the Source.
>>>
>>> My source generation for intervalStart has intervalStart incrementing at
>>> a regular interval.
>>> I tried using the intervalStart for my Watermark with a out of order
>>> late boundedness of 3 secs.
>>> The *AggregateFunction* I am using calls the add() fine but *never
>>> calls the getResult().*
>>> My assumption was that the AggregateFunction I am using would push the
>>> data to getResult
>>> based on the Watermark based on intervalStart incrementing beyong the
>>> previous watermark t.
>>> But it doesn't -is it because I have limited number of input records and
>>> once intervalStart gets to the end
>>> of the input records too fast, it stops incrementing the watermar and
>>> hence doesn't push data ?
>>>
>>> With System.currentTimeMillis, it happily keeps increasing and hence
>>> pushes the data.
>>>
>>> Created this class:
>>> public class MonitoringAssigner implements
>>> AssignerWithPunctuatedWatermarks<Monitoring> {
>>>     private long bound = 3 * 1000;//3 secs out of order bound in
>>> millisecs
>>>
>>>     public MonitoringAssigner(long bound) {
>>>         this.bound = bound;
>>>     }
>>>     public Watermark checkAndGetNextWatermark(Monitoring monitoring,
>>> long extractedTimestamp) {
>>>         long nextWatermark = extractedTimestamp - bound;
>>>         //simply emit a Watermark with every event
>>>         return new Watermark(nextWatermark);
>>>     }
>>>
>>>     @Override
>>>     public long extractTimestamp(Monitoring monitoring, long previousTS)
>>> {
>>>         /*LocalDateTime intervalStart =
>>> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
>>> 02:21:06.057
>>>         long extractedTS =
>>> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped
>>> pushing recs after a certain time*
>>>         return extractedTS;*/
>>>         return *System.currentTimeMillis*();//incrementing current time
>>>
>>>     }
>>>
>>>

Reply via email to