mimaison commented on a change in pull request #9902:
URL: https://github.com/apache/kafka/pull/9902#discussion_r562011024



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -183,6 +186,10 @@ public void disconnected(String id, long now) {
             connectingNodes.remove(id);
         } else {
             resetConnectionSetupTimeout(nodeState);
+            if (nodeState.state.isConnected()) {
+                // If a connection had previously been established, re-resolve 
DNS because the IPs may have changed
+                nodeState.addresses = Collections.emptyList();

Review comment:
       Slightly weird that we're updating a "private" field here.
   
   Also the comment is a bit misleading. We're not re-resolving DNS here but 
instead clearing state so if we reconnect later, the client will be forced to 
re-resolve then.

##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -907,6 +924,140 @@ public void testCorrelationId() {
         ids.forEach(id -> assertTrue(id < 
SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
     }
 
+    @Test
+    public void testReconnectAfterAddressChange() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            return (mockHostResolver.getUseNewAddresses() && 
newAddresses.contains(inetAddress)) ||

Review comment:
       I was slightly confused until I realized `getUseNewAddresses()` is a 
boolean. Maybe `useNewAddresses()` would be a better name?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -239,24 +241,63 @@ public NetworkClient(Selectable selector,
              logContext);
     }
 
-    private NetworkClient(MetadataUpdater metadataUpdater,
-                          Metadata metadata,
-                          Selectable selector,
-                          String clientId,
-                          int maxInFlightRequestsPerConnection,
-                          long reconnectBackoffMs,
-                          long reconnectBackoffMax,
-                          int socketSendBuffer,
-                          int socketReceiveBuffer,
-                          int defaultRequestTimeoutMs,
-                          long connectionSetupTimeoutMs,
-                          long connectionSetupTimeoutMaxMs,
-                          ClientDnsLookup clientDnsLookup,
-                          Time time,
-                          boolean discoverBrokerVersions,
-                          ApiVersions apiVersions,
-                          Sensor throttleTimeSensor,
-                          LogContext logContext) {
+    public NetworkClient(MetadataUpdater metadataUpdater,

Review comment:
       Do we really need this constructor? As far as I can tell, it's only 
called by the other 3 above. These could directly call the real one below 
instead of going through this new one. WDYT?

##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -907,6 +924,140 @@ public void testCorrelationId() {
         ids.forEach(id -> assertTrue(id < 
SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
     }
 
+    @Test
+    public void testReconnectAfterAddressChange() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            return (mockHostResolver.getUseNewAddresses() && 
newAddresses.contains(inetAddress)) ||
+                   (!mockHostResolver.getUseNewAddresses() && 
initialAddresses.contains(inetAddress));
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // Connect to one the initial addresses, then change the addresses and 
disconnect
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        mockHostResolver.setUseNewAddresses();
+        selector.serverDisconnect(node.idString());
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to one initial address and one new 
address
+        assertEquals(1, initialAddressConns.get());
+        assertEquals(1, newAddressConns.get());
+    }
+
+    @Test
+    public void testFailedConnectionToFirstAddress() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            // Refuse first connection attempt
+            return initialAddressConns.get() > 1;
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // First connection attempt should fail
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // Second connection attempt should succeed
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to two of the initial addresses, 
none of the new address, and should
+        // only have performed one DNS resolution
+        assertEquals(2, initialAddressConns.get());
+        assertEquals(0, newAddressConns.get());
+        assertEquals(1, mockHostResolver.resolutionCount());
+    }
+
+    @Test
+    public void testFailedConnectionToFirstAddressAfterReconnect() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            // Refuse first connection attempt to the new addresses
+            return initialAddresses.contains(inetAddress) || 
newAddressConns.get() > 1;
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // Connect to one the initial addresses, then change the addresses and 
disconnect
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        mockHostResolver.setUseNewAddresses();
+        selector.serverDisconnect(node.idString());
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // First connection attempt to new addresses should fail
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // Second connection attempt to new addresses should succeed
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to two of the initial addresses, 
none of the new address, and should
+        // only have performed one DNS resolution

Review comment:
       This comment needs updating




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to