Hi Arvid,

Thanks for your reply.
Yes, the warning is throwed by Kafka-clients. Here is the warning log after I 
deleted the topic that Kafka consumer is listening to.
18:46:27,297 WARN  org.apache.kafka.clients.NetworkClient  - [Consumer 
clientId=consumer-2, groupId=osstest] Error while fetching metadata with 
correlation id 30288 : {oss-consumer-test-1=UNKNOWN_TOPIC_OR_PARTITION, 

I have a follow up question.
“What you could do is try to config Kafka consumer to fail hard when topic 
metadata cannot be retrieved with a small timeout.”
May I ask how to config Flink Kafka Consumer to fail when one of the topic 
metadata cannot be retrieved?
For example,  a FlinkKafkaConsumer is listening to 3 different Kafka topics. If 
one of the Kafka topics is deleted and FlinkKafkaConsumer cannot retrive the 
corresponding topic metadata. We hope that FlinkKafkaConsumer will fail hard 
and stop retrieving other two live topics.
Thank you very much!

Thanks, Yan

From: Arvid Heise <ar...@apache.org>
Date: Thursday, September 2, 2021 at 1:27 PM
To: Yan Wang <y.yan.w.w...@oracle.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: [External] : Re: Use FlinkKafkaConsumer to synchronize multiple Kafka 
Hi Yan,

Afaik this is not directly supported and would be surprising to other users 
since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I suspect 
that the warning you received is also coming from Kafka consumer (I have not 
found a respective warning in Flink's code base but you could also show the 
exact log statement so I can recheck).

What you could do is try to config Kafka consumer to fail hard when topic 
metadata cannot be retrieved with a small timeout.

Note that I'm a bit confused by the terms "dead" topic and "rebooted" topic. 
Afaik you can only have dead brokers and rebooted brokers and maybe deleted 
topics. But I have yet to understand a use case where you would delete a topic 
while the consumer is running.

On Thu, Sep 2, 2021 at 4:58 AM Yan Wang 
<y.yan.w.w...@oracle.com<mailto:y.yan.w.w...@oracle.com>> wrote:

We are currently using a single FlinkKafkaConsumer to consume multiple Kafka 
topics, however, we find that if one of the Kafka topics goes down at run 
time(like rebooting one of the topics), the FlinkKafkaConsumer will keep 
throwing warning message of the dead Kafka topic, and will also continue 
consume other live Kafka topics.
However, what we want is that, if one of the topics goes down, the 
FlinkKafkaConsumer will wait and stop consuming other live topics until the 
dead topic goes live.

Code example:
List<String> kafkaTopicsList = new ArrayList<>( Arrays.asList( “KafkaTopic1”,  
“KafkaTopic2” ) );
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( 
kafkaTopicsList, new SimpleStringSchema(), properties);

As shown in the code example, kafkaTopicsList contains two Kafka topics, and 
flinkKafkaConsumer consumes both two topics. We hope that if KafkaTopic1 goes 
down at run-time(we may reboot KafkaTopic1 at run time), the flinkKafkaConsumer 
will wait and stop consuming KafkaTopic2, until KafkaTopic1 goes live again.

May I ask is it possible to achieve this purpose using current Flink API? Do we 
need to edit configuration somewhere? Or we have to overwrite 
FlinkKafkaConsumer Class to achieve this? Thank you very much!

Thanks, Yan

Reply via email to