Hi David, yes that's intentionally [1] as it could lead to correctness issues and it was inconsistently used across sources. Yes it should be documented.
For now I'd put it in the KafkaSource docs because I'm not sure in which release notes it would fit best. In which release notes would you expect such a disclaimer? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition On Mon, Nov 22, 2021 at 10:37 AM David Anderson <dander...@apache.org> wrote: > I've seen a few questions recently from folks migrating from > FlinkKafkaConsumer to KafkaSource that make me suspect that something has > changed. > > In FlinkKafkaConsumerBase we have this code which sets a source subtask to > idle if all of its partitions are empty when the subtask starts: > > // mark the subtask as temporarily idle if there are no initial > seed partitions; > // once this subtask discovers some partitions and starts > collecting records, the subtask's > // status will automatically be triggered back to be active. > if (subscribedPartitionsToStartOffsets.isEmpty()) { > sourceContext.markAsTemporarilyIdle(); > } > > Unsurprisingly, people have code that depends on this behavior, and after > switching to KafkaSource, their tests or applications are failing to > produce results (because the idle partitions are now holding back the > watermarks). This leads me to believe that KafkaSource does not work the > same way. > > Can someone confirm that the behavior here has changed? > > Was this intentional? Yes, one can use withIdleness to achieve something > similar, but if this is now required, it needs to be documented in the > release notes, etc. > > David >