
Shengnan YU commented on KAFKA-8100:

Thanks for replying. However if the topic is permanent deleted, the warning log 
will keep flushing. Why not make it configurable to let the topic metadata 
expiry after a period?

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092\n");
>         props.put("group.id", "test10");
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.max.age.ms", "60000");
>         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
> String>(props);
>         class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
>             private KafkaConsumer<String, String> consumer;
>             public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
>                 this.consumer = kafkaConsumer;
>             }
>             public void onPartitionsRevoked(Collection<TopicPartition> 
> partitions) {
>             }
>             public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions) {
>                 //reading all partitions from the beginning
>                 consumer.seekToBeginning(partitions);
>             }
>         }
>         consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
>         while (true) {
>             ConsumerRecords<String, String> records = consumer.poll(100);
>             for (ConsumerRecord<String, String> record : records) {
>                 System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
>             }
>         }
> }
> {code}

