Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the
watermark is getting updated and its working. I need to understand 2 things

1. Please help to understand what is happening when I enable checkpointing,
and how to make it work with enable checkpointing as I need to write a data
stream with checkpoint enable.

2. Second, so basically I want to collect all the session data and want to
process all the events data at the end of the session (using inactivity for
x minutes).
I know this functionality is available in the session window where I can
create a session window using an inactive period But there enrichment and
processing of events is not recommended. So, how I can use the same
functionality to trigger based on the inactivity period and process all the
events and clear the queue.


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger <> wrote:

> side note: this question has been asked on SO as well:
> (I'm mentioning this here so that we are not wasting support resources in
> our community on double-debugging issues)
> On Mon, Mar 2, 2020 at 5:36 PM aj <> wrote:
>> Hi David,
>> Currently, I am testing it with a single source and parallelism 1 only so
>> not able to understand this behavior.
>> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <>
>> wrote:
>>> Hi Anuj,
>>> What parallelism has your source? Do all of your source tasks produce
>>> records? Watermark is always the minimum of timestamps seen from all the
>>> upstream operators. Therefore if some of them do not produce records the
>>> watermark will not progress. You can read more about Watermarks and how
>>> they work here:
>>> Hope that helps
>>> Best,
>>> Dawid
>>> On 02/03/2020 16:26, aj wrote:
>>> I am trying to use process function to some processing on a set of
>>> events. I am using event time and keystream. The issue I am facing is The
>>> watermark value is always coming as 9223372036854725808. I have put print
>>> statement to debug and it shows like this:
>>> timestamp------1583128014000 extractedTimestamp 1583128014000
>>> currentwatermark-----9223372036854775808
>>> timestamp------1583128048000 extractedTimestamp 1583128048000
>>> currentwatermark-----9223372036854775808
>>> timestamp------1583128089000 extractedTimestamp 1583128089000
>>> currentwatermark-----9223372036854775808
>>> timestamp and extracted timestamp changing but watermark not getting
>>> updated. So no record is getting in the queue as context.timestamp is never
>>> less than the watermark.
>>> DataStream<GenericRecord> dataStream = 
>>> env.addSource(searchConsumer).name("search_list_keyless");
>>>         DataStream<GenericRecord> dataStreamWithWaterMark =  
>>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>>        try {
>>>             dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, 
>>> String>) record -> {
>>>                 StringBuilder builder = new StringBuilder();
>>>                 builder.append(record.get("session_id"));
>>>                 builder.append(record.get("user_id"));
>>>                 return builder.toString();
>>>             }).process(new MatchFunction()).print();
>>>         }
>>>         catch (Exception e){
>>>             e.printStackTrace();
>>>         }
>>>         env.execute("start session process");
>>>     }
>>>     public static class SessionAssigner implements 
>>> AssignerWithPunctuatedWatermarks<GenericRecord>  {
>>>         @Override
>>>         public long extractTimestamp(GenericRecord record, long 
>>> previousElementTimestamp) {
>>>             long timestamp = (long) record.get("event_ts");
>>>             System.out.println("timestamp------"+ timestamp);
>>>             return timestamp;
>>>         }
>>>         @Override
>>>         public Watermark checkAndGetNextWatermark(GenericRecord record, 
>>> long extractedTimestamp) {
>>>             // simply emit a watermark with every event
>>>             System.out.println("extractedTimestamp "+extractedTimestamp);
>>>             return new Watermark(extractedTimestamp - 30000);
>>>         }
>>>  }
>>>    @Override
>>>     public void processElement(GenericRecord record, Context context, 
>>> Collector<Object> collector) throws Exception {
>>>         TimerService timerService = context.timerService();
>>>         System.out.println("currentwatermark----"+ 
>>> timerService.currentWatermark());
>>>         if (context.timestamp() > timerService.currentWatermark()) {
>>>             Tuple2<Long, PriorityQueue<GenericRecord>> queueval = 
>>> queueState.value();
>>>             PriorityQueue<GenericRecord> queue = queueval.f1;
>>>             long startTime = queueval.f0;
>>>             System.out.println("starttime----"+ startTime);
>>>             if (queue == null) {
>>>                 queue = new PriorityQueue<>(10, new TimeStampComprator());
>>>                 startTime = (long) record.get("event_ts");
>>>             }
>>>             queueState.update(new Tuple2<>(startTime, queue));
>>>             timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
>>>         }
>>>     }
>>> }
>>> Please help me to underand what i am doing wrong.
>>>  --
>>> Thanks & Regards,
>>> Anuj Jain
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <>
>> <>

Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07


Reply via email to