The ctx.currentProcessingTIme() escaped my notice.  Thank you for pointing
it out Yun Tang.

I now set my processing Timer using
ctx.timerService().registerProcessingTimeTimer(ctx.currentProcessingTime()+2000);
and it works.

On Wed, 6 Nov 2019 at 21:57, Yun Tang <myas...@live.com> wrote:

> Hi Komal
>
>
>
> Please read carefully on the Javadoc of BaseContext#timeStamp [1], it
> would be null if your program  is set to {@link
> org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you
> want to fetch current processing time stamp, please use `ctx#
> currentProcessingTime()`.
>
>
>
> [1]
> https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50
>
> Best
>
> Yun Tang
>
>
>
> *From: *Komal Mariam <komal.mar...@gmail.com>
> *Date: *Wednesday, November 6, 2019 at 6:19 PM
> *To: *user <user@flink.apache.org>
> *Subject: *ctx.timestamp() returning null when using Processing Time
>
>
>
> Dear all,
>
>
>
> I want to clear some of my variables in KeyedBroadcastProcessFunction after
> a certain time. I implemented the onTimer() function but even though I am
> using ProcessingTime
>
> like so:
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am
> getting null when ctx.timestamp() is called.
>
> How do I ensure that some of variables or states inside
> KeyedBroadcasrProcessFunction are cleared after a certain time interval
> (say 3 seconds)?
>
>
> Here is skeleton of what it looks like. I am using Flink 1.9
>
>
>
>
>
> public static class myFunction extends
> KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer,
> List<Integer>, List<String>>> {
>
>
>
> List<Point> fixedPoints;
>
>
>
> public void processBroadcastElement(
>                 Point myPoint,
>                 Context ctx,
>                 Collector<Tuple3<Integer, List<Integer>, List<String>>>
> out) throws Exception {
>
> /* put myPoint in broadcastState*/
>
> }
>
>
>
>
>
>  public void processElement(Point queryPoint, ReadOnlyContext ctx,
> Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws
> Exception {
>
>
>
> /* collect output*/
>
>
>
> System.out.println("TimeStamp: " +ctx.timestamp());
>  //returns "Timestamp: null"
>
> ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);
> //java.lang.NullPointerException
>
>
>
> }
>
>
>
> //does not run due to java.lang.NullPointerException
>
>
>
>         public void onTimer(long timestamp,
>                             OnTimerContext ctx,
>                             Collector<Tuple3<Integer, List<Integer>,
> List<String>>> out) throws Exception
>         {
>                 System.out.println("Clearing...");
>                 fixedPoints.clear();
>                 System.out.println("Clearing...COMPLETE");
>     }
>
>
>
> }
>
>
>
> When I change to 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> I get timestamps like these
> TimeStamp: 1573016104289
>
> TimeStamp: 1573016104294
> TimeStamp: 1573016104292
>
>
>
> however the onTimer() function is never called and fixedPoints is not
> cleared.
>
> My datastreams right now are very limited. keyedStream has 8 elements
> while broadcast stream has 7.
>
>
>
> I would really appreciate any help!
>
> Best Regards,
> Komal
>
>
>

Reply via email to