Hi there, Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which also extended AbstractRichFunction and could thus utilize State and getRuntimeContext() in there. This worked as the TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed my assigner in as the userFunction to that operator. I used this feature for some "per partition processing" which Flinks somehow isn't ideally suited for at the moment I guess. We have ascending watermarks per kafka partition and do some processing on that. In order to maintain state per kafka-partition, I now keyby kafkapartition in our stream (not ideal but better than operatorstate in terms of rescaling) but afterwards need to emulate the watermark strategy from the initial kafka source, i.e. reassign watermarks the same way as the kafka source did (per kafka partition within the operator). Via getRuntimeContext() I am/was able to identify the kafkaPartitions one operatorinstance was responsible for and could produce the outputwatermark accordingly. (min over all responsible partitions). In Flink 1.11, how can I rebuild this behavior? Do I really need to build my own TimestampsAndWatermarksOperator which works like the old one? Or is there a better approach? Best regards Theo