Yep, but "Consider this example: if you had a Kafka Consumer that was consuming from topic A, you did a savepoint, then changed your Kafka consumer to instead consume from topic B, and then restarted your job from the savepoint. Before this change, your consumer would now consume from both topic A and B because it was stored in state that the consumer was consuming from topic A. With the change, your consumer would only consume from topic B after restore because it now filters the topics that are stored in state using the *configured topics*."
Yep, the issue is that our pattern is all inclusive and thus *configured topics *implies all topics for the all inclusive pattern and thus all of our 1500 plus topics . We use flink to push all data in all topics to hdfs. We now are in the process of pruning our topics ( and as you can imagine, they do not follow a specific pattern on name ) and thus removal of some 500 of them. We would have assumed that removing the topics from the cluster would be an indication to flink that though it has state associated with those topics and irrespective of pattern, the topics should no longer be consumed, ver akin to auto discovery of partitions ( and thus topics ) but opposite. Does that make sense ? On Fri, Jun 14, 2019 at 3:24 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Visha, > > If I remember correctly, the behavior of the Kafka consumer was changed in > Flink 1.8 to account for such situations. > Please check the release notes [1] and the corresponding Jira issue [2]. > > If this is not the behavior you need, please feel free to create a new > Jira issue and start a discussion there. > > Thanks, Fabian > > [1] > https://flink.apache.org/news/2019/04/09/release-1.8.0.html#important-changes > [2] https://issues.apache.org/jira/browse/FLINK-10342 > > Am Do., 13. Juni 2019 um 22:31 Uhr schrieb Vishal Santoshi < > vishal.santo...@gmail.com>: > >> I guess, adjusting the pattern ( blacklisting the topic/s ) would >> work.... >> >> On Thu, Jun 13, 2019 at 3:02 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Given >>> >>> >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L471 >>> >>> it seems that if >>> >>> * I have a regex pattern for consuming a bunch of topics >>> * auto create is turned on >>> >>> then even if I do this >>> >>> * suspend the pipe with SP >>> * delete the topic >>> * start the pipe >>> >>> It will recreate the topic, b'coz the state has it and the check is >>> against the pattern rather then against the cluster metadata....in case >>> auto create is turned off it will throw an exception ( I think ) >>> >>> >>> The question thus is, as to how can we safely delete a topic from our >>> kafka cluster without either a recreation ( auto create on ) or exception ( >>> auto create off ) when restarting from a SP, where the restore state had >>> that topic.. >>> >>> Thanks much, >>> >>> Visha. >>> >>> >>>