[ https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15049441#comment-15049441 ]
Jason Gustafson commented on KAFKA-1894: ---------------------------------------- There's been a ton of movement on the new consumer since this issue was first posted, so here's an update of the current blocking calls: 1. poll(timeout) blocks indefinitely for a) finding the coordinator, b) joining the group, and c) fetching/resetting offsets. The last of these may require an OffsetFetch to get the last committed position or a ListOffset to reset the position to the earliest or latest offset. Obviously we depend on the coordinator being available to join the group, but we also depend on partition leaders being available if we need to call ListOffset. 2. commitSync() blocks indefinitely until the commit succeeds. This may involve finding a new coordinator if the old one has failed. 3. position() blocks to set the position (if it needs to be set). This is similar to case c) in poll() above. 4. committed() blocks to fetch the last committed position if the consumer has no cached commit. 5. partitionsFor()/listTopics() blocks to send a TopicMetadataRequest to any of the brokers (if the request cannot be served from the cache). 6. close() blocks if auto-commit is enabled in a call to commitSync(). In all of these cases, we're fairly careful to propagate unrecoverable errors to the user. For example, commitSync() will not retry a commit if it receives an ILLEGAL_GENERATION since there is no way the commit can succeed after that error. However, there are still some situations where the blocking can be prolonged. In the most extreme case, if the consumer cannot connect to any of the brokers it knows about, it will retry indefinitely until it can. Other than that, the main cases that come to mind are blocking in ListOffsets when the partition leader is not available, and blocking in coordinator discovery when the coordinator cannot be found (e.g. if there is no leader for the corresponding partition of __consumer_offsets). Going forward, it would be ideal to have poll() enforce the timeout parameter in any situation. This is complicated mainly by the fact that we may have to leave an active rebalance in progress, which will surely require additional state tracking. There are some subtle implications as well. For example, if we return to the user with a JoinGroup on the wire, it could actually return in a separate blocking call and have its handler callback invoked. We'd have to be careful that this doesn't cause any surprises for the user (e.g. partitions getting revoked while a call to position() is active). We also have limited options when it comes to handling the rebalance callback which could itself call another blocking method such as commitSync(). Since we have only one thread to work with, there doesn't seem like much we can do in this case. The other blocking calls are more straightforward: we can just raise a TimeoutException after a configurable amount of time has passed. The producer has a setting "max.block.ms" which we could borrow for this purpose (guess we would need a KIP for this now). But similarly as in poll(), we'll have to be careful about any state we're leaving behind when the exceptions are thrown (in particular requests left on the wire). An open question for the consumer is what its behavior should be if a partition leader cannot be found. Once the initial offset has been found, we generally handle leader failures gracefully by requesting metadata updates in the background and continuing to fetch from the other partitions. But if the leader failure occurs before we've fetched the initial offset, we will not send any fetches until we've found the new leader. This case is probably rare in practice, but it would seem more desirable (and more consistent) to let fetching continue on other partitions. This will require decoupling the offset state of individual partitions, which may be tricky. > Avoid long or infinite blocking in the consumer > ----------------------------------------------- > > Key: KAFKA-1894 > URL: https://issues.apache.org/jira/browse/KAFKA-1894 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Jay Kreps > Assignee: Jason Gustafson > Fix For: 0.10.0.0 > > > The new consumer has a lot of loops that look something like > {code} > while(!isThingComplete()) > client.poll(); > {code} > This occurs both in KafkaConsumer but also in NetworkClient.completeAll. > These retry loops are actually mostly the behavior we want but there are > several cases where they may cause problems: > - In the case of a hard failure we may hang for a long time or indefinitely > before realizing the connection is lost. > - In the case where the cluster is malfunctioning or down we may retry > forever. > It would probably be better to give a timeout to these. The proposed approach > would be to add something like retry.time.ms=60000 and only continue retrying > for that period of time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)