[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798695#comment-16798695 ]
Shengnan YU commented on KAFKA-8100: ------------------------------------ Thanks for replying. However if the topic is permanent deleted, the warning log will keep flushing. Why not make it configurable to let the topic metadata expiry after a period? > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --------------------------------------------------------------------------------------- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 1.1.1, 2.1.1 > Reporter: Shengnan YU > Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "60000"); > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer<String, String> consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection<TopicPartition> > partitions) { > } > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords<String, String> records = consumer.poll(100); > for (ConsumerRecord<String, String> record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)