The ctx.currentProcessingTIme() escaped my notice. Thank you for pointing it out Yun Tang.
I now set my processing Timer using ctx.timerService().registerProcessingTimeTimer(ctx.currentProcessingTime()+2000); and it works. On Wed, 6 Nov 2019 at 21:57, Yun Tang <myas...@live.com> wrote: > Hi Komal > > > > Please read carefully on the Javadoc of BaseContext#timeStamp [1], it > would be null if your program is set to {@link > org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you > want to fetch current processing time stamp, please use `ctx# > currentProcessingTime()`. > > > > [1] > https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50 > > Best > > Yun Tang > > > > *From: *Komal Mariam <komal.mar...@gmail.com> > *Date: *Wednesday, November 6, 2019 at 6:19 PM > *To: *user <user@flink.apache.org> > *Subject: *ctx.timestamp() returning null when using Processing Time > > > > Dear all, > > > > I want to clear some of my variables in KeyedBroadcastProcessFunction after > a certain time. I implemented the onTimer() function but even though I am > using ProcessingTime > > like so: > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am > getting null when ctx.timestamp() is called. > > How do I ensure that some of variables or states inside > KeyedBroadcasrProcessFunction are cleared after a certain time interval > (say 3 seconds)? > > > Here is skeleton of what it looks like. I am using Flink 1.9 > > > > > > public static class myFunction extends > KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, > List<Integer>, List<String>>> { > > > > List<Point> fixedPoints; > > > > public void processBroadcastElement( > Point myPoint, > Context ctx, > Collector<Tuple3<Integer, List<Integer>, List<String>>> > out) throws Exception { > > /* put myPoint in broadcastState*/ > > } > > > > > > public void processElement(Point queryPoint, ReadOnlyContext ctx, > Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws > Exception { > > > > /* collect output*/ > > > > System.out.println("TimeStamp: " +ctx.timestamp()); > //returns "Timestamp: null" > > ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000); > //java.lang.NullPointerException > > > > } > > > > //does not run due to java.lang.NullPointerException > > > > public void onTimer(long timestamp, > OnTimerContext ctx, > Collector<Tuple3<Integer, List<Integer>, > List<String>>> out) throws Exception > { > System.out.println("Clearing..."); > fixedPoints.clear(); > System.out.println("Clearing...COMPLETE"); > } > > > > } > > > > When I change to > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > I get timestamps like these > TimeStamp: 1573016104289 > > TimeStamp: 1573016104294 > TimeStamp: 1573016104292 > > > > however the onTimer() function is never called and fixedPoints is not > cleared. > > My datastreams right now are very limited. keyedStream has 8 elements > while broadcast stream has 7. > > > > I would really appreciate any help! > > Best Regards, > Komal > > >