Hello, Recently, I've decided to migrate one of my toy projects to Flink 1.14.0 and I realized that JVM cluster behaviour changed. It no longer respects event time. On Flink 1.12.5 I didn't experience any issues. I tried some debugging and it seems that InternalTimerServiceImpl's watermark is not incremented at all. I don't see any issues with my code when running it on a cluster.
I have provided a code sample that shows the issue. Just put numbers 1 to 10 to a local Kafka cluster topic numbers. Try compiling it with Flink 1.12.5 / 1.13.3 / 1.14.0 (note: in recent Flink versions KafkaRecordDeserializer became KafkaRecordDeserializationSchema). object Example extends App { import org.apache.flink.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment val kafkaSource = KafkaSource.builder[Int]() .setTopics("numbers") .setBootstrapServers("localhost:9092") .setGroupId("flink-job") .setDeserializer(new KafkaRecordDeserializationSchema[Int] { override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Int]): Unit = { out.collect(new String(record.value()).toInt) } override def getProducedType: TypeInformation[Int] = TypeInformation.of(classOf[Int]) }) .build() env.fromSource(kafkaSource, new WatermarkStrategy[Int] { override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Int] = new AscendingTimestampsWatermarks[Int] override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Int] = { (element: Int, _: Long) => element } }, "kafka-source") .map(Integer.valueOf(_)) .keyBy(_ => 1) .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) .process(new ProcessWindowFunction[Integer, Int, Int, TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[Integer], out: Collector[Int]): Unit = { out.collect(elements.last) } }) .print() env.execute() } I see no output in Flink 1.13.3 and 1.14.0. The issue doesn't happen with `fromElements` source -> I concluded that for finite sources all windows are eventually fired. I wonder what could be causing this - maybe I just missed some mandatory configuration?