Hi,
Thx for your reply and pointers on the currentLowWatermark. Looks like the
Flink UI has tab for Watermarks itself for an Operator.

I dump 5 records into the Kinesis Data Stream and am trying to read the
same record from the FlinkKinesisConsumer and am not able to.
I am using the same monitoring.getIntervalStart() in the Watermark
generation(intervalStart - bound) in *MonitoringAssigner* class that I used
to generate data on the producer side. I generate intervalStart on the
Producer side which increments on each record by 3-10 millisecs. The
watermark is being generated with intervalStart - bound(3 secs)-so, every
watermark generated is > than the previous one. So, why does it not push
data out ?  It gets into the MGroupingWindowAggregate.add(..) method but
never gets into the MGroupingWindowAggregate.getResult(..) method ?? It
works when i produce 1000 records or so into Kinesis data stream.

Here is a gist of my code-
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//FlinkConsumer
Properties kinesisConsumerConfig = new Properties();
        ......
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"10000");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"2000");//2000
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.*TRIM_HORIZON*.name());
FlinkKinesisConsumer<Monitoring> kinesisConsumer = FlinkKinesisConsumer<>(
                kinesisTopicRead, new MonitoringMapKinesisSchema(),
kinesisConsumerConfig);
final DataStreamSource<Monitoring> monitoringDataStreamSource =
env.addSource(kinesisConsumer);
DataStream<Monitoring> kinesisStream =
monitoringDataStreamSource.assignTimestampsAndWatermarks(new
*MonitoringAssigner*(3000));//code at bottom

org.apache.flink.streaming.api.windowing.time.Time timeWindow =
Time.seconds(5);
final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
                kinesisStream.timeWindow(timeWindow);
DataStream<MonitoringGrouping> enrichedComponentInstanceStream1 =
windowStream.*aggregate*(
                new *MGroupingWindowAggregate*(....),//AggregateFunction
impl
                new *MGroupingAggregateWindowProcessing*(...));

public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks<Monitoring> {
    private long *bound = 3 * 1000*;//3 secs out of order bound in millisecs
public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        return new Watermark(nextWatermark);
    }
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        LocalDateTime intervalStart = Utils.getLocalDateTime(
*monitoring.getIntervalStart()*);//2012-07-12 02:21:06.057
        long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);
        return extractedTS;
        //return System.currentTimeMillis(); //this works fine.
    }
}

TIA,
Vijay

On Sat, Dec 15, 2018 at 5:42 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Vijay,
>
> Could you provide more information about your problem? For example
> - Which kind of window do you use?
> - What's the window size?
> - A relatively complete code is better :-)
>
> As for the problem, it is probably the event time has not reached the end
> of the window. You can monitor the watermark in the web dashboard[1].
> Also, changing even time to processing time is another way to verify if it
> is a watermark problem.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>
>
> On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi,
>> Observations on Watermarks:
>> Read this great article:
>> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>>
>> * Watermark means when for any event TS, when to stop waiting for arrival
>> of earlier events.
>> * Watermark t means all events with Timestamp < t have already arrived.
>> * When to push data out - When watermark with TS >= t arrives
>>
>> Only *using incrementing current time for watermark seems to be working
>> correctly* but not sure if it aligns up correctly with EventTime
>> processing.
>> *Using the incoming records intervalStart as the Watermark source  for
>> EventTime causes data to not be pushed at all* in cases when i have just
>> 5 records in the Source.
>>
>> My source generation for intervalStart has intervalStart incrementing at
>> a regular interval.
>> I tried using the intervalStart for my Watermark with a out of order late
>> boundedness of 3 secs.
>> The *AggregateFunction* I am using calls the add() fine but *never calls
>> the getResult().*
>> My assumption was that the AggregateFunction I am using would push the
>> data to getResult
>> based on the Watermark based on intervalStart incrementing beyong the
>> previous watermark t.
>> But it doesn't -is it because I have limited number of input records and
>> once intervalStart gets to the end
>> of the input records too fast, it stops incrementing the watermar and
>> hence doesn't push data ?
>>
>> With System.currentTimeMillis, it happily keeps increasing and hence
>> pushes the data.
>>
>> Created this class:
>> public class MonitoringAssigner implements
>> AssignerWithPunctuatedWatermarks<Monitoring> {
>>     private long bound = 3 * 1000;//3 secs out of order bound in millisecs
>>
>>     public MonitoringAssigner(long bound) {
>>         this.bound = bound;
>>     }
>>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
>> extractedTimestamp) {
>>         long nextWatermark = extractedTimestamp - bound;
>>         //simply emit a Watermark with every event
>>         return new Watermark(nextWatermark);
>>     }
>>
>>     @Override
>>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>>         /*LocalDateTime intervalStart =
>> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
>> 02:21:06.057
>>         long extractedTS =
>> Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped
>> pushing recs after a certain time*
>>         return extractedTS;*/
>>         return *System.currentTimeMillis*();//incrementing current time
>>
>>     }
>>
>>

Reply via email to