[
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michal Turek updated KAFKA-3450:
--------------------------------
Priority: Critical (was: Minor)
Priority updated from minor to critical. I have just realized that
{{producer.send()}} is defined to be asynchronous but it may block for really
long time. It's asynchronous only for {{max.block.ms = 0}}, but it isn't
documented and the default value is 60 seconds. The implementation doesn't
match contract.
{noformat}
/**
* Asynchronously send a record to a topic and invoke the provided callback
when the send has been acknowledged.
* <p>
* The send is asynchronous and this method will return immediately once
the record has been stored in the buffer of
* records waiting to be sent. This allows sending many records in parallel
without blocking to wait for the
* response after each one.
* ...
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
{noformat}
> Producer blocks on send to topic that doesn't exist if auto create is disabled
> ------------------------------------------------------------------------------
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.9.0.1
> Reporter: Michal Turek
> Assignee: Jun Rao
> Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if
> the destination topic doesn't exist and if their automatic creation is
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
> * Test of sending to a topic that does not exist while automatic creation of
> topics is disabled in Kafka (auto.create.topics.enable=false).
> */
> public class NoSuchTopicTest {
> private static final Logger LOGGER =
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG,
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
> // Default is 60 seconds
> try (Producer<String, String> producer = new
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist",
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}",
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo",
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}",
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO c.a.k.o.nosuchtopic.NoSuchTopicTest [main]:
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 0 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 1 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 2 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 3 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 4 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 5 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.433 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 6 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.534 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 7 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.635 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 8 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.736 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 9 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.772 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]:
> Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 35 ms. (NoSuchTopicTest.java:29)
> 2016-03-23 12:44:38.773 INFO c.a.k.o.nosuchtopic.NoSuchTopicTest [main]:
> Sending message (NoSuchTopicTest.java:35)
> 2016-03-23 12:44:38.837 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 10 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.938 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 11 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.039 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 12 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.140 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 13 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.242 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 14 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.345 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 15 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.447 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 16 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.549 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 17 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.651 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 18 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.752 WARN o.a.kafka.clients.NetworkClient
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching
> metadata with correlation id 19 :
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION,
> ThisTopicDoesNotExistToo=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:39.774 ERROR c.a.k.o.nosuchtopic.NoSuchTopicTest [main]:
> Send failed: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 21 ms. (NoSuchTopicTest.java:38)
> 2016-03-23 12:44:39.774 INFO o.a.k.c.producer.KafkaProducer [main]:
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> (KafkaProducer.java:613)
> {noformat}
> Known workaround
> - Configure {{max.block.ms = 0}} in producer to prevent blocking and return
> from send() immediately. But be careful, I'm not sure if is it safe and can't
> cause something even worse ;-)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)