Hmm, as a newbie with Kafka v0.9 and having tested this scenario, this is what is happening:
You can reproduce failure by setting just BOOTSTRAP_SERVERS_CONFIG, by not passsing custom timeouts, it should then invoke your callback in producer.send(record, new ProducerCallback(record)); with "org.apache.kafka.common.errors.TimeoutException: Failed to update metadata" after 60000 ms. based on the default: MAX_BLOCK_MS_CONFIG is 60s The logic is probably inside the send() method that waits for metadata based on maxBlockTimeMS. maxBlockTimeMs : long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); Wait for cluster metadata including partitions for the given topic to be available. If you set the deprecated BLOCK_ON_BUFFER_FULL_CONFIG any value, it can set the MAX_BLOCK_MS to Long.MAX_VALUE, waiting for ever to make a connection. if BLOCK_ON_BUFFER_FULL is set if(BLOCK_ON_BUFFER_FULL=true) { maxBlockTimeMs : defaults to Long.MAX_VALUE } else if(METADATA_FETCH_TIMEOUT_CONFIG is set) { maxBlockTimeMs : METADATA_FETCH_TIMEOUT_CONFIG } else{ maxBlockTimeMs : MAX_BLOCK_MS_CONFIG } If nothing is passed, it defaults to 60s public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"; private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block." + "These methods can be blocked either because the buffer is full or metadata unavailable." + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout."; Other default timeouts: .define(MAX_BLOCK_MS_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, MAX_BLOCK_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) // default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) // @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} public static final String TIMEOUT_CONFIG = "timeout.ms"; private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the " + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " + "is measured on the server side and does not include the network latency of the request."; On Mon, Jul 11, 2016 at 3:26 AM, Hari Sekhon <hpsek...@gmail.com> wrote: > Hi kafka folks, > > I've written some code to test Kafka brokers using the Kafka 0.9 API (I > need Kerberos support). > > It appears the 0.9 API is trapping broker connection refused exception at a > lower level and retrying in a loop (visible only at debug log level), when > I actually want it to fail and raise the exception to my client code so I > can report the result. > > Is there any way to get the Kafka 0.9 API to preferably raise the broker > connection refused exception straight away up to my client code or to set a > short timeout for how long it retries the conection in order to force the > tester to fail fast? > > Thanks > > Hari > -- Radha Krishna, Proddaturi 253-234-5657