Dear all,

I want to clear some of my variables in KeyedBroadcastProcessFunction after
a certain time. I implemented the onTimer() function but even though I am
using ProcessingTime
like so:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am
getting null when ctx.timestamp() is called.

How do I ensure that some of variables or states inside
KeyedBroadcasrProcessFunction are cleared after a certain time interval
(say 3 seconds)?

Here is skeleton of what it looks like. I am using Flink 1.9


public static class myFunction extends
KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer,
List<Integer>, List<String>>> {

List<Point> fixedPoints;

public void processBroadcastElement(
                Point myPoint,
                Context ctx,
                Collector<Tuple3<Integer, List<Integer>, List<String>>>
out) throws Exception {
/* put myPoint in broadcastState*/
}


 public void processElement(Point queryPoint, ReadOnlyContext ctx,
Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws
Exception {

/* collect output*/

System.out.println("TimeStamp: " +ctx.timestamp());
 //returns "Timestamp: null"
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);
//java.lang.NullPointerException

}

//does not run due to java.lang.NullPointerException

        public void onTimer(long timestamp,
                            OnTimerContext ctx,
                            Collector<Tuple3<Integer, List<Integer>,
List<String>>> out) throws Exception
        {
                System.out.println("Clearing...");
                fixedPoints.clear();
                System.out.println("Clearing...COMPLETE");
    }

}

When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I get timestamps like these
TimeStamp: 1573016104289
TimeStamp: 1573016104294
TimeStamp: 1573016104292

however the onTimer() function is never called and fixedPoints is not
cleared.

My datastreams right now are very limited. keyedStream has 8 elements while
broadcast stream has 7.

I would really appreciate any help!

Best Regards,
Komal

Reply via email to