[ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-11848: ----------------------------------- Labels: auto-deprioritized-major auto-unassigned (was: auto-unassigned stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION > ------------------------------------------------------------ > > Key: FLINK-11848 > URL: https://issues.apache.org/jira/browse/FLINK-11848 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.6.4 > Reporter: Shengnan YU > Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned > > Recently we are doing some streaming jobs with apache flink. There are > multiple KAFKA topics with a format as xxxxxx_yy-mm-dd. We used a topic regex > pattern to let a consumer to consume those topics. However, if we delete some > older topics, it seems that the metadata in consumer does not update properly > so It still remember those outdated topic in its topic list, which leads to > *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It > seems to occur in producer as well. Any idea to solve this problem? Thank you > very much! > > Example to reproduce problem: > There are multiple kafka topics which are > "test20190310","test20190311","test20190312" for instance. I run the job and > everything is ok. Then if I delete topic "test20190310", the consumer does > not perceive the topic is deleted, it will still go fetching metadata of that > topic. In taskmanager's log, unknown errors display. > {code:java} > public static void main(String []args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.rest", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, > "1200000"); > Pattern topics = Pattern.compile("^test.*$"); > FlinkKafkaConsumer011<String> consumer = new > FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props); > DataStream<String> stream = env.addSource(consumer); > stream.writeToSocket("localhost", 44444, new SimpleStringSchema()); > env.execute("test"); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)