Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the
watermark is getting updated and its working. I need to understand 2 things
:

1. Please help to understand what is happening when I enable checkpointing,
and how to make it work with enable checkpointing as I need to write a data
stream with checkpoint enable.

2. Second, so basically I want to collect all the session data and want to
process all the events data at the end of the session (using inactivity for
x minutes).
I know this functionality is available in the session window where I can
create a session window using an inactive period But there enrichment and
processing of events is not recommended. So, how I can use the same
functionality to trigger based on the inactivity period and process all the
events and clear the queue.


Thanks,
Anuj


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger <rmetz...@apache.org> wrote:

> side note: this question has been asked on SO as well:
> https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
> (I'm mentioning this here so that we are not wasting support resources in
> our community on double-debugging issues)
>
> On Mon, Mar 2, 2020 at 5:36 PM aj <ajainje...@gmail.com> wrote:
>
>> Hi David,
>>
>> Currently, I am testing it with a single source and parallelism 1 only so
>> not able to understand this behavior.
>>
>> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hi Anuj,
>>>
>>> What parallelism has your source? Do all of your source tasks produce
>>> records? Watermark is always the minimum of timestamps seen from all the
>>> upstream operators. Therefore if some of them do not produce records the
>>> watermark will not progress. You can read more about Watermarks and how
>>> they work here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>>
>>> Hope that helps
>>>
>>> Best,
>>>
>>> Dawid
>>> On 02/03/2020 16:26, aj wrote:
>>>
>>> I am trying to use process function to some processing on a set of
>>> events. I am using event time and keystream. The issue I am facing is The
>>> watermark value is always coming as 9223372036854725808. I have put print
>>> statement to debug and it shows like this:
>>>
>>> timestamp------1583128014000 extractedTimestamp 1583128014000
>>> currentwatermark-----9223372036854775808
>>>
>>> timestamp------1583128048000 extractedTimestamp 1583128048000
>>> currentwatermark-----9223372036854775808
>>>
>>> timestamp------1583128089000 extractedTimestamp 1583128089000
>>> currentwatermark-----9223372036854775808
>>>
>>> timestamp and extracted timestamp changing but watermark not getting
>>> updated. So no record is getting in the queue as context.timestamp is never
>>> less than the watermark.
>>>
>>>
>>> DataStream<GenericRecord> dataStream = 
>>> env.addSource(searchConsumer).name("search_list_keyless");
>>>         DataStream<GenericRecord> dataStreamWithWaterMark =  
>>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>>
>>>        try {
>>>             dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, 
>>> String>) record -> {
>>>                 StringBuilder builder = new StringBuilder();
>>>                 builder.append(record.get("session_id"));
>>>                 builder.append(record.get("user_id"));
>>>                 return builder.toString();
>>>             }).process(new MatchFunction()).print();
>>>         }
>>>         catch (Exception e){
>>>             e.printStackTrace();
>>>         }
>>>         env.execute("start session process");
>>>
>>>     }
>>>
>>>     public static class SessionAssigner implements 
>>> AssignerWithPunctuatedWatermarks<GenericRecord>  {
>>>         @Override
>>>         public long extractTimestamp(GenericRecord record, long 
>>> previousElementTimestamp) {
>>>             long timestamp = (long) record.get("event_ts");
>>>             System.out.println("timestamp------"+ timestamp);
>>>             return timestamp;
>>>         }
>>>
>>>         @Override
>>>         public Watermark checkAndGetNextWatermark(GenericRecord record, 
>>> long extractedTimestamp) {
>>>             // simply emit a watermark with every event
>>>             System.out.println("extractedTimestamp "+extractedTimestamp);
>>>             return new Watermark(extractedTimestamp - 30000);
>>>         }
>>>  }
>>>
>>>    @Override
>>>     public void processElement(GenericRecord record, Context context, 
>>> Collector<Object> collector) throws Exception {
>>>
>>>         TimerService timerService = context.timerService();
>>>         System.out.println("currentwatermark----"+ 
>>> timerService.currentWatermark());
>>>         if (context.timestamp() > timerService.currentWatermark()) {
>>>
>>>             Tuple2<Long, PriorityQueue<GenericRecord>> queueval = 
>>> queueState.value();
>>>             PriorityQueue<GenericRecord> queue = queueval.f1;
>>>             long startTime = queueval.f0;
>>>             System.out.println("starttime----"+ startTime);
>>>
>>>             if (queue == null) {
>>>                 queue = new PriorityQueue<>(10, new TimeStampComprator());
>>>                 startTime = (long) record.get("event_ts");
>>>             }
>>>             queueState.update(new Tuple2<>(startTime, queue));
>>>             timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
>>>         }
>>>     }
>>>
>>> }
>>>
>>> Please help me to underand what i am doing wrong.
>>>
>>>  --
>>> Thanks & Regards,
>>> Anuj Jain
>>>
>>>
>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to