[ https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao updated KAFKA-3450: --------------------------- Assignee: (was: Jun Rao) > Producer blocks on send to topic that doesn't exist if auto create is disabled > ------------------------------------------------------------------------------ > > Key: KAFKA-3450 > URL: https://issues.apache.org/jira/browse/KAFKA-3450 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.9.0.1 > Reporter: Michal Turek > Priority: Critical > > {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if > the destination topic doesn't exist and if their automatic creation is > disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is > logged every 100 ms in a loop until the 60 seconds timeout expires, but the > operation is not recoverable. > Preconditions > - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false > - Kafka 0.9.0.1 clients. > Example minimalist code > https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java > {noformat} > /** > * Test of sending to a topic that does not exist while automatic creation of > topics is disabled in Kafka (auto.create.topics.enable=false). > */ > public class NoSuchTopicTest { > private static final Logger LOGGER = > LoggerFactory.getLogger(NoSuchTopicTest.class); > public static void main(String[] args) { > Properties properties = new Properties(); > properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > NoSuchTopicTest.class.getSimpleName()); > properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); > // Default is 60 seconds > try (Producer<String, String> producer = new > KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) { > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > LOGGER.info("Sending message"); > producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", > "key", "value"), (metadata, exception) -> { > if (exception != null) { > LOGGER.error("Send failed: {}", exception.toString()); > } else { > LOGGER.info("Send successful: {}-{}/{}", > metadata.topic(), metadata.partition(), metadata.offset()); > } > }); > } > } > } > {noformat} > Related output > {noformat} > 2016-03-23 12:44:37.725 INFO c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: > Sending message (NoSuchTopicTest.java:26) > 2016-03-23 12:44:37.830 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 0 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:37.928 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 1 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.028 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 2 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.130 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 3 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.231 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 4 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.332 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 5 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.433 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 6 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.534 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 7 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.635 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 8 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.736 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 9 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: > Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to > update metadata after 35 ms. (NoSuchTopicTest.java:29) > 2016-03-23 12:44:38.773 INFO c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: > Sending message (NoSuchTopicTest.java:35) > 2016-03-23 12:44:38.837 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 10 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:38.938 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 11 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.039 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 12 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.140 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 13 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.242 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 14 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.345 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 15 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.447 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 16 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.549 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 17 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.651 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 18 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.752 WARN o.a.kafka.clients.NetworkClient > [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching > metadata with correlation id 19 : > {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION, > ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582) > 2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: > Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to > update metadata after 21 ms. (NoSuchTopicTest.java:38) > 2016-03-23 12:44:39.774 INFO o.a.k.c.producer.KafkaProducer [main]: > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > (KafkaProducer.java:613) > {noformat} > Known workaround > - Configure {{max.block.ms = 0}} in producer to prevent blocking and return > from send() immediately. But be careful, I'm not sure if is it safe and can't > cause something even worse ;-) -- This message was sent by Atlassian JIRA (v6.3.4#6332)