Hmm, as a newbie with Kafka v0.9 and having tested this scenario, this is
what is happening:

You can reproduce failure by setting just BOOTSTRAP_SERVERS_CONFIG, by not
passsing custom timeouts, it should then invoke your callback in
producer.send(record, new ProducerCallback(record)); with
"org.apache.kafka.common.errors.TimeoutException: Failed to update
metadata" after 60000 ms. based on the default: MAX_BLOCK_MS_CONFIG is 60s

The logic is probably inside the send() method that waits for metadata
based on maxBlockTimeMS.
maxBlockTimeMs : long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
Wait for cluster metadata including partitions for the given topic to be
available.

If you set the deprecated BLOCK_ON_BUFFER_FULL_CONFIG any value, it can set
the MAX_BLOCK_MS to Long.MAX_VALUE, waiting for ever to make a connection.

if BLOCK_ON_BUFFER_FULL is set
    if(BLOCK_ON_BUFFER_FULL=true) {
        maxBlockTimeMs : defaults to Long.MAX_VALUE
    } else if(METADATA_FETCH_TIMEOUT_CONFIG is set) {
        maxBlockTimeMs : METADATA_FETCH_TIMEOUT_CONFIG
    } else{
        maxBlockTimeMs : MAX_BLOCK_MS_CONFIG
    }

If nothing is passed, it defaults to 60s
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls
how long {@link KafkaProducer#send()} and {@link
KafkaProducer#partitionsFor} will block."
            + "These methods can be blocked either because the buffer is
full or metadata unavailable."
            + "Blocking in the user-supplied serializers or partitioner
will not be counted against this timeout.";

Other default timeouts:

.define(MAX_BLOCK_MS_CONFIG,
        Type.LONG,
        60 * 1000,
        atLeast(0),
        Importance.MEDIUM,
        MAX_BLOCK_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
        Type.INT,
        30 * 1000,
        atLeast(0),
        Importance.MEDIUM,
        REQUEST_TIMEOUT_MS_DOC)
.define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0),
Importance.LOW, METADATA_MAX_AGE_DOC)

// default is set to be a bit lower than the server default (10 min), to
avoid both client and server closing connection at same time
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
        Type.LONG,
        9 * 60 * 1000,
        Importance.MEDIUM,
        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)

 // @deprecated This config will be removed in a future release. Please use
{@link #REQUEST_TIMEOUT_MS_CONFIG}
 public static final String TIMEOUT_CONFIG = "timeout.ms";
 private static final String TIMEOUT_DOC = "The configuration controls the
maximum amount of time the server will wait for acknowledgments from
followers to " + "meet the acknowledgment requirements the producer has
specified with the <code>acks</code> configuration. If the "
      + "requested number of acknowledgments are not met when the timeout
elapses an error will be returned. This timeout "
      + "is measured on the server side and does not include the network
latency of the request.";











On Mon, Jul 11, 2016 at 3:26 AM, Hari Sekhon <hpsek...@gmail.com> wrote:

> Hi kafka folks,
>
> I've written some code to test Kafka brokers using the Kafka 0.9 API (I
> need Kerberos support).
>
> It appears the 0.9 API is trapping broker connection refused exception at a
> lower level and retrying in a loop (visible only at debug log level), when
> I actually want it to fail and raise the exception to my client code so I
> can report the result.
>
> Is there any way to get the Kafka 0.9 API to preferably raise the broker
> connection refused exception straight away up to my client code or to set a
> short timeout for how long it retries the conection in order to force the
> tester to fail fast?
>
> Thanks
>
> Hari
>



-- 
Radha Krishna, Proddaturi
253-234-5657

Reply via email to