Hi David,

Currently, I am testing it with a single source and parallelism 1 only so
not able to understand this behavior.

On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Anuj,
>
> What parallelism has your source? Do all of your source tasks produce
> records? Watermark is always the minimum of timestamps seen from all the
> upstream operators. Therefore if some of them do not produce records the
> watermark will not progress. You can read more about Watermarks and how
> they work here:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>
> Hope that helps
>
> Best,
>
> Dawid
> On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of events.
> I am using event time and keystream. The issue I am facing is The watermark
> value is always coming as 9223372036854725808. I have put print statement
> to debug and it shows like this:
>
> timestamp------1583128014000 extractedTimestamp 1583128014000
> currentwatermark-----9223372036854775808
>
> timestamp------1583128048000 extractedTimestamp 1583128048000
> currentwatermark-----9223372036854775808
>
> timestamp------1583128089000 extractedTimestamp 1583128089000
> currentwatermark-----9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is never
> less than the watermark.
>
>
> DataStream<GenericRecord> dataStream = 
> env.addSource(searchConsumer).name("search_list_keyless");
>         DataStream<GenericRecord> dataStreamWithWaterMark =  
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>
>        try {
>             dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, 
> String>) record -> {
>                 StringBuilder builder = new StringBuilder();
>                 builder.append(record.get("session_id"));
>                 builder.append(record.get("user_id"));
>                 return builder.toString();
>             }).process(new MatchFunction()).print();
>         }
>         catch (Exception e){
>             e.printStackTrace();
>         }
>         env.execute("start session process");
>
>     }
>
>     public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks<GenericRecord>  {
>         @Override
>         public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
>             long timestamp = (long) record.get("event_ts");
>             System.out.println("timestamp------"+ timestamp);
>             return timestamp;
>         }
>
>         @Override
>         public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
>             // simply emit a watermark with every event
>             System.out.println("extractedTimestamp "+extractedTimestamp);
>             return new Watermark(extractedTimestamp - 30000);
>         }
>  }
>
>    @Override
>     public void processElement(GenericRecord record, Context context, 
> Collector<Object> collector) throws Exception {
>
>         TimerService timerService = context.timerService();
>         System.out.println("currentwatermark----"+ 
> timerService.currentWatermark());
>         if (context.timestamp() > timerService.currentWatermark()) {
>
>             Tuple2<Long, PriorityQueue<GenericRecord>> queueval = 
> queueState.value();
>             PriorityQueue<GenericRecord> queue = queueval.f1;
>             long startTime = queueval.f0;
>             System.out.println("starttime----"+ startTime);
>
>             if (queue == null) {
>                 queue = new PriorityQueue<>(10, new TimeStampComprator());
>                 startTime = (long) record.get("event_ts");
>             }
>             queueState.update(new Tuple2<>(startTime, queue));
>             timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
>         }
>     }
>
> }
>
> Please help me to underand what i am doing wrong.
>
>  --
> Thanks & Regards,
> Anuj Jain
>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to