Hi!

Is the number of partitions of your Kafka topic smaller than the number of
parallelisms of the job? If yes some parallelism will be idle and will not
emit watermarks unless you set idleness for them. See [1].

I guess the original behavior of Flink 1.12 is not the expected behavior
but I don't know what causes that. If anyone knows please share your
thoughts.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

Carlos Downey <carlos.downey...@gmail.com> 于2021年11月10日周三 上午1:41写道:

> Hello,
>
> Recently, I've decided to migrate one of my toy projects to Flink 1.14.0
> and I realized that JVM cluster behaviour changed. It no longer respects
> event time. On Flink 1.12.5 I didn't experience any issues. I tried some
> debugging and it seems that InternalTimerServiceImpl's watermark is not
> incremented at all. I don't see any issues with my code when running it on
> a cluster.
>
> I have provided a code sample that shows the issue. Just put numbers 1 to
> 10 to a local Kafka cluster topic numbers. Try compiling it with Flink 1.12.5
> / 1.13.3 / 1.14.0 (note: in recent Flink versions KafkaRecordDeserializer
>  became KafkaRecordDeserializationSchema).
>
> object Example extends App {
>   import org.apache.flink.api.scala._
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>   val kafkaSource = KafkaSource.builder[Int]()
>     .setTopics("numbers")
>     .setBootstrapServers("localhost:9092")
>     .setGroupId("flink-job")
>     .setDeserializer(new KafkaRecordDeserializationSchema[Int] {
>       override def deserialize(record: ConsumerRecord[Array[Byte], 
> Array[Byte]], out: Collector[Int]): Unit = {
>         out.collect(new String(record.value()).toInt)
>       }
>
>       override def getProducedType: TypeInformation[Int] = 
> TypeInformation.of(classOf[Int])
>     })
>     .build()
>
>   env.fromSource(kafkaSource, new WatermarkStrategy[Int] {
>       override def createWatermarkGenerator(context: 
> WatermarkGeneratorSupplier.Context): WatermarkGenerator[Int] = new 
> AscendingTimestampsWatermarks[Int]
>
>       override def createTimestampAssigner(context: 
> TimestampAssignerSupplier.Context): TimestampAssigner[Int] = {
>         (element: Int, _: Long) => element
>       }
>     }, "kafka-source")
>     .map(Integer.valueOf(_))
>     .keyBy(_ => 1)
>     .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
>     .process(new ProcessWindowFunction[Integer, Int, Int, TimeWindow] {
>       override def process(key: Int, context: Context, elements: 
> Iterable[Integer], out: Collector[Int]): Unit = {
>         out.collect(elements.last)
>       }
>     })
>     .print()
>   env.execute()
> }
>
> I see no output in Flink 1.13.3 and 1.14.0. The issue doesn't happen with
> `fromElements` source -> I concluded that for finite sources all windows
> are eventually fired. I wonder what could be causing this - maybe I just
> missed some mandatory configuration?
>

Reply via email to