Vincent Fumo created KAFKA-3957: ----------------------------------- Summary: consumer timeout not being respected when kafka broker is not available Key: KAFKA-3957 URL: https://issues.apache.org/jira/browse/KAFKA-3957 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.9.0.1 Reporter: Vincent Fumo Priority: Minor
KafkaConsumer v0.9:: I have a consumer set up with session.timeout.ms set to 30s. I make a call like consumer.poll(10000) but if the kafka broker is down, that call will hang indefinitely. Digging into the code it seems that the timeout isn't respected: KafkaConsumer calls out to pollOnce() as seen below:: private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) coordinator.ensureCoordinatorKnown(); // ensure we have partitions assigned if we expect to if (subscriptions.partitionsAutoAssigned()) coordinator.ensurePartitionAssignment(); // fetch positions if we have partitions we're subscribed to that we // don't know the offset for if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions()); // init any new fetches (won't resend pending fetches) Cluster cluster = this.metadata.fetch(); Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); // if data is available already, e.g. from a previous network client poll() call to commit, // then just return it immediately if (!records.isEmpty()) { return records; } fetcher.initFetches(cluster); client.poll(timeout); return fetcher.fetchedRecords(); } and we see that we stick on the call to coordinator.ensureCoordinatorKnown(); AbstractCoordinator :: public void ensureCoordinatorKnown() { while (coordinatorUnknown()) { RequestFuture<Void> future = sendGroupMetadataRequest(); client.poll(future); if (future.failed()) { if (future.isRetriable()) client.awaitMetadataUpdate(); else throw future.exception(); } } } in this case the Future fails (since the broker is down) and then a call to client.awaitMetadataUpdate() is made which in the case of the ConsumerNetworkClient will block forever : public void awaitMetadataUpdate() { int version = this.metadata.requestUpdate(); do { poll(Long.MAX_VALUE); } while (this.metadata.version() == version); } I feel that this is a bug. When you set a timeout on a call to a blocking method, that timeout should be respected and an exception should be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)