[ 
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11848:
-----------------------------------
      Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
    Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> ------------------------------------------------------------
>
>                 Key: FLINK-11848
>                 URL: https://issues.apache.org/jira/browse/FLINK-11848
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.4
>            Reporter: Shengnan YU
>            Priority: Minor
>              Labels: auto-deprioritized-major, auto-unassigned
>
> Recently we are doing some streaming jobs with apache flink. There are 
> multiple KAFKA topics with a format as xxxxxx_yy-mm-dd. We used a topic regex 
> pattern to let a consumer to consume those topics. However, if we delete some 
> older topics, it seems that the metadata in consumer does not update properly 
> so It still remember those outdated topic in its topic list, which leads to 
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
> seems to occur in producer as well. Any idea to solve this problem? Thank you 
> very much!
>  
> Example to reproduce problem:
> There are multiple kafka topics which are 
> "test20190310","test20190311","test20190312" for instance. I run the job and 
> everything is ok. Then if I delete topic "test20190310", the consumer does 
> not perceive the topic is deleted, it will still go fetching metadata of that 
> topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "localhost:9092\n");
>         props.put("group.id", "test10");
>         props.put("enable.auto.commit", "true");
>         props.put("auto.commit.interval.ms", "1000");
>         props.put("auto.offset.rest", "earliest");
>         props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         
> props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>                "1200000");
>         Pattern topics = Pattern.compile("^test.*$");
>         FlinkKafkaConsumer011<String> consumer = new 
> FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
>         DataStream<String> stream = env.addSource(consumer);
>         stream.writeToSocket("localhost", 44444, new SimpleStringSchema());
>         env.execute("test");
>     }
> }
> {code}
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to