It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works well. For example,
``` sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long]) .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print() ``` prints ``` 1 1002 2002 3002 ``` How can I implement my KeyedProcessFunction so that it would work as expected. Dian Fu <dian0511...@gmail.com> 于 2019年10月28日周一 下午2:04写道: > Hi, > > It generates watermark periodically by default in the underlying > implementation of `assignAscendingTimestamps`. So for your test program, > the watermark is still not generated yet and I think that's the reason why > it's Long.MinValue. > > Regards, > Dian > > 在 2019年10月28日,上午11:59,杨力 <bill.le...@gmail.com> 写道: > > I'm going to sort elements in a PriorityQueue and set up timers at > (currentWatermark + 1), following the instructions in > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing > . > > However, it seems that context.timerService().currentWatermark() always > returns Long.MinValue and my onTimer will never be called. Here's minimal > program to reproduce the problem. Am I missing something? > > ``` > val sEnv = StreamExecutionEnvironment.getExecutionEnvironment > sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > sEnv.setParallelism(argOps.parallelism()) > sEnv.fromCollection(Seq[Long](1, 2, > 3)).assignAscendingTimestamps(identity[Long]) > .process(new ProcessFunction[Long, Long] { > override def processElement(i: Long, context: ProcessFunction[Long, > Long]#Context, collector: Collector[Long]): Unit = { > collector.collect(context.timerService().currentWatermark()) > } > }).print() > sEnv.execute() > ``` > > ``` > -9223372036854775808 > -9223372036854775808 > -9223372036854775808 > ``` > > >