Thanks, Robert for mentioning this, I will take care of it in future posts.
I am able to figure out the issue. When I disable checkpoint then the watermark is getting updated and its working. I need to understand 2 things : 1. Please help to understand what is happening when I enable checkpointing, and how to make it work with enable checkpointing as I need to write a data stream with checkpoint enable. 2. Second, so basically I want to collect all the session data and want to process all the events data at the end of the session (using inactivity for x minutes). I know this functionality is available in the session window where I can create a session window using an inactive period But there enrichment and processing of events is not recommended. So, how I can use the same functionality to trigger based on the inactivity period and process all the events and clear the queue. Thanks, Anuj On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger <rmetz...@apache.org> wrote: > side note: this question has been asked on SO as well: > https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808 > (I'm mentioning this here so that we are not wasting support resources in > our community on double-debugging issues) > > On Mon, Mar 2, 2020 at 5:36 PM aj <ajainje...@gmail.com> wrote: > >> Hi David, >> >> Currently, I am testing it with a single source and parallelism 1 only so >> not able to understand this behavior. >> >> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hi Anuj, >>> >>> What parallelism has your source? Do all of your source tasks produce >>> records? Watermark is always the minimum of timestamps seen from all the >>> upstream operators. Therefore if some of them do not produce records the >>> watermark will not progress. You can read more about Watermarks and how >>> they work here: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams >>> >>> Hope that helps >>> >>> Best, >>> >>> Dawid >>> On 02/03/2020 16:26, aj wrote: >>> >>> I am trying to use process function to some processing on a set of >>> events. I am using event time and keystream. The issue I am facing is The >>> watermark value is always coming as 9223372036854725808. I have put print >>> statement to debug and it shows like this: >>> >>> timestamp------1583128014000 extractedTimestamp 1583128014000 >>> currentwatermark-----9223372036854775808 >>> >>> timestamp------1583128048000 extractedTimestamp 1583128048000 >>> currentwatermark-----9223372036854775808 >>> >>> timestamp------1583128089000 extractedTimestamp 1583128089000 >>> currentwatermark-----9223372036854775808 >>> >>> timestamp and extracted timestamp changing but watermark not getting >>> updated. So no record is getting in the queue as context.timestamp is never >>> less than the watermark. >>> >>> >>> DataStream<GenericRecord> dataStream = >>> env.addSource(searchConsumer).name("search_list_keyless"); >>> DataStream<GenericRecord> dataStreamWithWaterMark = >>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner()); >>> >>> try { >>> dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, >>> String>) record -> { >>> StringBuilder builder = new StringBuilder(); >>> builder.append(record.get("session_id")); >>> builder.append(record.get("user_id")); >>> return builder.toString(); >>> }).process(new MatchFunction()).print(); >>> } >>> catch (Exception e){ >>> e.printStackTrace(); >>> } >>> env.execute("start session process"); >>> >>> } >>> >>> public static class SessionAssigner implements >>> AssignerWithPunctuatedWatermarks<GenericRecord> { >>> @Override >>> public long extractTimestamp(GenericRecord record, long >>> previousElementTimestamp) { >>> long timestamp = (long) record.get("event_ts"); >>> System.out.println("timestamp------"+ timestamp); >>> return timestamp; >>> } >>> >>> @Override >>> public Watermark checkAndGetNextWatermark(GenericRecord record, >>> long extractedTimestamp) { >>> // simply emit a watermark with every event >>> System.out.println("extractedTimestamp "+extractedTimestamp); >>> return new Watermark(extractedTimestamp - 30000); >>> } >>> } >>> >>> @Override >>> public void processElement(GenericRecord record, Context context, >>> Collector<Object> collector) throws Exception { >>> >>> TimerService timerService = context.timerService(); >>> System.out.println("currentwatermark----"+ >>> timerService.currentWatermark()); >>> if (context.timestamp() > timerService.currentWatermark()) { >>> >>> Tuple2<Long, PriorityQueue<GenericRecord>> queueval = >>> queueState.value(); >>> PriorityQueue<GenericRecord> queue = queueval.f1; >>> long startTime = queueval.f0; >>> System.out.println("starttime----"+ startTime); >>> >>> if (queue == null) { >>> queue = new PriorityQueue<>(10, new TimeStampComprator()); >>> startTime = (long) record.get("event_ts"); >>> } >>> queueState.update(new Tuple2<>(startTime, queue)); >>> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000); >>> } >>> } >>> >>> } >>> >>> Please help me to underand what i am doing wrong. >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> >>> >>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>