[ https://issues.apache.org/jira/browse/KAFKA-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517659#comment-14517659 ]
Ewen Cheslack-Postava commented on KAFKA-1843: ---------------------------------------------- A closely related and easily reproduced problem was also reported by [~omkreddy] on the mailing list: We are testing new producer on a 2 node cluster. Under some node failure scenarios, producer is not able to update metadata. {quote} Steps to reproduce 1. form a 2 node cluster (K1, K2) 2. create a topic with single partition, replication factor = 2 3. start producing data (producer metadata : K1,K2) 2. Kill leader node (say K1) 3. K2 becomes the leader (producer metadata : K2) 4. Bring back K1 and Kill K2 before metadata.max.age.ms 5. K1 becomes the Leader (producer metadata still contains : K2) After this point, producer is not able to update the metadata. producer continuously trying to connect with dead node (K2). {quote} > Metadata fetch/refresh in new producer should handle all node connection > states gracefully > ------------------------------------------------------------------------------------------ > > Key: KAFKA-1843 > URL: https://issues.apache.org/jira/browse/KAFKA-1843 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 0.8.2.0 > Reporter: Ewen Cheslack-Postava > > KAFKA-1642 resolved some issues with the handling of broker connection states > to avoid high CPU usage, but made the minimal fix rather than the ideal one. > The code for handling the metadata fetch is difficult to get right because it > has to handle a lot of possible connectivity states and failure modes across > all the known nodes. It also needs to correctly integrate with the > surrounding event loop, providing correct poll() timeouts to both avoid busy > looping and make sure it wakes up and tries new nodes in the face of both > connection and request failures. > A patch here should address a few issues: > 1. Make sure connection timeouts, as implemented in KAFKA-1842, are cleanly > integrated. This mostly means that when a connecting node is selected to > fetch metadata from, that the code notices that and sets the next timeout > based on the connection timeout rather than some other backoff. > 2. Rethink the logic and naming of NetworkClient.leastLoadedNode. That method > actually takes into account a) the current connectivity of each node, b) > whether the node had a recent connection failure, c) the "load" in terms of > in flight requests. It also needs to ensure that different clients don't use > the same ordering across multiple calls (which is already addressed in the > current code by nodeIndexOffset) and that we always eventually try all nodes > in the face of connection failures (which isn't currently handled by > leastLoadedNode and probably cannot be without tracking additional state). > This method also has to work for new consumer use cases even though it is > currently only used by the new producer's metadata fetch. Finally it has to > properly handle when other code calls initiateConnect() since the normal path > for sending messages also initiates connections. > We can already say that there is an order of preference given a single call > (as follows), but making this work across multiple calls when some initial > choices fail to connect or return metadata *and* connection states may be > changing is much more difficult. > * Connected, zero in flight requests - the request can be sent immediately > * Connecting node - it will hopefully be connected very soon and by > definition has no in flight requests > * Disconnected - same reasoning as for a connecting node > * Connected, > 0 in flight requests - we consider any # of in flight > requests as a big enough backlog to delay the request a lot. > We could use an approach that better accounts for # of in flight requests > rather than just turning it into a boolean variable, but that probably > introduces much more complexity than it is worth. > 3. The most difficult case to handle so far has been when leastLoadedNode > returns a disconnected node to maybeUpdateMetadata as its best option. > Properly handling the two resulting cases (initiateConnect fails immediately > vs. taking some time to possibly establish the connection) is tricky. > 4. Consider optimizing for the failure cases. The most common cases are when > you already have an active connection and can immediately get the metadata or > you need to establish a connection, but the connection and metadata > request/response happen very quickly. These common cases are infrequent > enough (default every 5 min) that establishing an extra connection isn't a > big deal as long as it's eventually cleaned up. The edge cases, like network > partitions where some subset of nodes become unreachable for a long period, > are harder to reason about but we should be sure we will always be able to > gracefully recover from them. > KAFKA-1642 enumerated the possible outcomes of a single call to > maybeUpdateMetadata. A good fix for this would consider all of those outcomes > for repeated calls to -- This message was sent by Atlassian JIRA (v6.3.4#6332)