I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark + 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing .
However, it seems that context.timerService().currentWatermark() always returns Long.MinValue and my onTimer will never be called. Here's minimal program to reproduce the problem. Am I missing something? ``` val sEnv = StreamExecutionEnvironment.getExecutionEnvironment sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) sEnv.setParallelism(argOps.parallelism()) sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identity[Long]) .process(new ProcessFunction[Long, Long] { override def processElement(i: Long, context: ProcessFunction[Long, Long]#Context, collector: Collector[Long]): Unit = { collector.collect(context.timerService().currentWatermark()) } }).print() sEnv.execute() ``` ``` -9223372036854775808 -9223372036854775808 -9223372036854775808 ```