GEORGE LI created KAFKA-12971: --------------------------------- Summary: Kakfa 1.1.x clients cache broker hostnames, client stuck when host is swapped for the same broker.id Key: KAFKA-12971 URL: https://issues.apache.org/jira/browse/KAFKA-12971 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.1.1, 1.1.0, 1.1.2 Reporter: GEORGE LI Fix For: 2.1.2
There was an upgrade of kafka-client version from 0.11 to 1.1.x to fix a bug in 0.11 with too frequent consumer offset commits. Due to the Flink version, it can be directly using latest 2.x kafka-client version. {code} Error sending fetch request (sessionId=178328175, epoch=INITIAL) to node 425: org.apache.kafka.common.errors.DisconnectException. {code} some consumers were stuck with above messages with broker.id 425 had hardware failures and got swapped with a different hostname. Comparing the {{ClusterConnectionStates.connecting()}} of the 3 versions: 0.11.0.3: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} 1.1.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { if (nodeState.containsKey(id)) { NodeConnectionState connectionState = nodeState.get(id); connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); } else { nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } } {code} 2.2.x: {code} public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { NodeConnectionState connectionState = nodeState.get(id); if (connectionState != null && connectionState.host().equals(host)) { connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); return; } else if (connectionState != null) { log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host); } // Create a new NodeConnectionState if nodeState does not already contain one // for the specified id or if the hostname associated with the node id changed. nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs, host, clientDnsLookup)); } {code} >From above, the {{0.11.0.3}} is just putting the node to the NodeState HashMap >to retry with update host. In {{1.1.x}}, it adds a logic of "caching". {{if (nodeState.containsKey(id))}}, However, if the HOSTNAME of the broker.id is swapped/changed, it never gets to the else block to update the NodeState with the new hostname. In {{2.2.x}}, it adds an additional check {{if (connectionState != null && connectionState.host().equals(host))}}, if the Hostname changed, then called {{nodeState.put()}} to update the host. So from above, it looks like the 1.1.x caching logic introduced a bug of not updating the nodeState()'s host when that is changed (e..g host failure, swap with a different hostname, but use the same broker.id). -- This message was sent by Atlassian Jira (v8.3.4#803005)