bob-barrett commented on a change in pull request #9902:
URL: https://github.com/apache/kafka/pull/9902#discussion_r567112712



##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -907,6 +924,140 @@ public void testCorrelationId() {
         ids.forEach(id -> assertTrue(id < 
SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID));
     }
 
+    @Test
+    public void testReconnectAfterAddressChange() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            return (mockHostResolver.getUseNewAddresses() && 
newAddresses.contains(inetAddress)) ||
+                   (!mockHostResolver.getUseNewAddresses() && 
initialAddresses.contains(inetAddress));
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // Connect to one the initial addresses, then change the addresses and 
disconnect
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        mockHostResolver.setUseNewAddresses();
+        selector.serverDisconnect(node.idString());
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to one initial address and one new 
address
+        assertEquals(1, initialAddressConns.get());
+        assertEquals(1, newAddressConns.get());
+    }
+
+    @Test
+    public void testFailedConnectionToFirstAddress() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            // Refuse first connection attempt
+            return initialAddressConns.get() > 1;
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // First connection attempt should fail
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // Second connection attempt should succeed
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to two of the initial addresses, 
none of the new address, and should
+        // only have performed one DNS resolution
+        assertEquals(2, initialAddressConns.get());
+        assertEquals(0, newAddressConns.get());
+        assertEquals(1, mockHostResolver.resolutionCount());
+    }
+
+    @Test
+    public void testFailedConnectionToFirstAddressAfterReconnect() {
+        AddressChangeHostResolver mockHostResolver = new 
AddressChangeHostResolver();
+        AtomicInteger initialAddressConns = new AtomicInteger();
+        AtomicInteger newAddressConns = new AtomicInteger();
+        MockSelector selector = new MockSelector(this.time, inetSocketAddress 
-> {
+            InetAddress inetAddress = inetSocketAddress.getAddress();
+            if (initialAddresses.contains(inetAddress)) {
+                initialAddressConns.incrementAndGet();
+            } else if (newAddresses.contains(inetAddress)) {
+                newAddressConns.incrementAndGet();
+            }
+            // Refuse first connection attempt to the new addresses
+            return initialAddresses.contains(inetAddress) || 
newAddressConns.get() > 1;
+        });
+        NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
+                ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new 
ApiVersions(), null, new LogContext(), mockHostResolver);
+
+        // Connect to one the initial addresses, then change the addresses and 
disconnect
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        mockHostResolver.setUseNewAddresses();
+        selector.serverDisconnect(node.idString());
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // First connection attempt to new addresses should fail
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertFalse(client.isReady(node, time.milliseconds()));
+
+        // Second connection attempt to new addresses should succeed
+        time.sleep(reconnectBackoffMaxMsTest);
+        client.ready(node, time.milliseconds());
+        time.sleep(connectionSetupTimeoutMaxMsTest);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.isReady(node, time.milliseconds()));
+
+        // We should have tried to connect to two of the initial addresses, 
none of the new address, and should
+        // only have performed one DNS resolution

Review comment:
       Yep, thanks for catching that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to