Hi David, Currently, I am testing it with a single source and parallelism 1 only so not able to understand this behavior.
On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Anuj, > > What parallelism has your source? Do all of your source tasks produce > records? Watermark is always the minimum of timestamps seen from all the > upstream operators. Therefore if some of them do not produce records the > watermark will not progress. You can read more about Watermarks and how > they work here: > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams > > Hope that helps > > Best, > > Dawid > On 02/03/2020 16:26, aj wrote: > > I am trying to use process function to some processing on a set of events. > I am using event time and keystream. The issue I am facing is The watermark > value is always coming as 9223372036854725808. I have put print statement > to debug and it shows like this: > > timestamp------1583128014000 extractedTimestamp 1583128014000 > currentwatermark-----9223372036854775808 > > timestamp------1583128048000 extractedTimestamp 1583128048000 > currentwatermark-----9223372036854775808 > > timestamp------1583128089000 extractedTimestamp 1583128089000 > currentwatermark-----9223372036854775808 > > timestamp and extracted timestamp changing but watermark not getting > updated. So no record is getting in the queue as context.timestamp is never > less than the watermark. > > > DataStream<GenericRecord> dataStream = > env.addSource(searchConsumer).name("search_list_keyless"); > DataStream<GenericRecord> dataStreamWithWaterMark = > dataStream.assignTimestampsAndWatermarks(new SessionAssigner()); > > try { > dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, > String>) record -> { > StringBuilder builder = new StringBuilder(); > builder.append(record.get("session_id")); > builder.append(record.get("user_id")); > return builder.toString(); > }).process(new MatchFunction()).print(); > } > catch (Exception e){ > e.printStackTrace(); > } > env.execute("start session process"); > > } > > public static class SessionAssigner implements > AssignerWithPunctuatedWatermarks<GenericRecord> { > @Override > public long extractTimestamp(GenericRecord record, long > previousElementTimestamp) { > long timestamp = (long) record.get("event_ts"); > System.out.println("timestamp------"+ timestamp); > return timestamp; > } > > @Override > public Watermark checkAndGetNextWatermark(GenericRecord record, long > extractedTimestamp) { > // simply emit a watermark with every event > System.out.println("extractedTimestamp "+extractedTimestamp); > return new Watermark(extractedTimestamp - 30000); > } > } > > @Override > public void processElement(GenericRecord record, Context context, > Collector<Object> collector) throws Exception { > > TimerService timerService = context.timerService(); > System.out.println("currentwatermark----"+ > timerService.currentWatermark()); > if (context.timestamp() > timerService.currentWatermark()) { > > Tuple2<Long, PriorityQueue<GenericRecord>> queueval = > queueState.value(); > PriorityQueue<GenericRecord> queue = queueval.f1; > long startTime = queueval.f0; > System.out.println("starttime----"+ startTime); > > if (queue == null) { > queue = new PriorityQueue<>(10, new TimeStampComprator()); > startTime = (long) record.get("event_ts"); > } > queueState.update(new Tuple2<>(startTime, queue)); > timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000); > } > } > > } > > Please help me to underand what i am doing wrong. > > -- > Thanks & Regards, > Anuj Jain > > > > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>