Hi everyone, I would like to start a discussion about state recovery of removed splits for the source API (FLIP-27). By looking at the implementation and behavior we observed, it seems that the source reader doesn't correctly handle a split being discovered as "removed" from the enumerator. Instead, the reader would still read from these removed splits because the reader is getting those splits from the state[1].
For example, if Kafka source is subscribing to a list of topics, and later on remove one topic from the subscribed list. This would lead to some unexpected behavior. And I think this can be considered as a bug. To fix this, one solution would be adding a method (i.e. removeSplit) SplitEnumeratorContext to signal these removed splits to the reader. Any thoughts? [1]: https://github.com/apache/flink/blob/180774e93902862cf3bfa03de00437ae49d743eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L313-L316 Best Bin Huang