Hi Komal
Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be
null if your program is set to {@link
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want
to fetch current processing time stamp, please use `ctx#
currentProcessingTime()`.
[1]
https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50
Best
Yun Tang
From: Komal Mariam <[email protected]>
Date: Wednesday, November 6, 2019 at 6:19 PM
To: user <[email protected]>
Subject: ctx.timestamp() returning null when using Processing Time
Dear all,
I want to clear some of my variables in KeyedBroadcastProcessFunction after a
certain time. I implemented the onTimer() function but even though I am using
ProcessingTime
like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I
am getting null when ctx.timestamp() is called.
How do I ensure that some of variables or states inside
KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3
seconds)?
Here is skeleton of what it looks like. I am using Flink 1.9
public static class myFunction extends KeyedBroadcastProcessFunction<String,
Point, Point, Tuple3<Integer, List<Integer>, List<String>>> {
List<Point> fixedPoints;
public void processBroadcastElement(
Point myPoint,
Context ctx,
Collector<Tuple3<Integer, List<Integer>, List<String>>> out)
throws Exception {
/* put myPoint in broadcastState*/
}
public void processElement(Point queryPoint, ReadOnlyContext ctx,
Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception {
/* collect output*/
System.out.println("TimeStamp: " +ctx.timestamp()); //returns
"Timestamp: null"
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);
//java.lang.NullPointerException
}
//does not run due to java.lang.NullPointerException
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<Tuple3<Integer, List<Integer>,
List<String>>> out) throws Exception
{
System.out.println("Clearing...");
fixedPoints.clear();
System.out.println("Clearing...COMPLETE");
}
}
When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I get timestamps like these
TimeStamp: 1573016104289
TimeStamp: 1573016104294
TimeStamp: 1573016104292
however the onTimer() function is never called and fixedPoints is not cleared.
My datastreams right now are very limited. keyedStream has 8 elements while
broadcast stream has 7.
I would really appreciate any help!
Best Regards,
Komal