bob-barrett commented on a change in pull request #9902: URL: https://github.com/apache/kafka/pull/9902#discussion_r567112712
########## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ########## @@ -907,6 +924,140 @@ public void testCorrelationId() { ids.forEach(id -> assertTrue(id < SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID)); } + @Test + public void testReconnectAfterAddressChange() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + return (mockHostResolver.getUseNewAddresses() && newAddresses.contains(inetAddress)) || + (!mockHostResolver.getUseNewAddresses() && initialAddresses.contains(inetAddress)); + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // Connect to one the initial addresses, then change the addresses and disconnect + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + mockHostResolver.setUseNewAddresses(); + selector.serverDisconnect(node.idString()); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to one initial address and one new address + assertEquals(1, initialAddressConns.get()); + assertEquals(1, newAddressConns.get()); + } + + @Test + public void testFailedConnectionToFirstAddress() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + // Refuse first connection attempt + return initialAddressConns.get() > 1; + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // First connection attempt should fail + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // Second connection attempt should succeed + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to two of the initial addresses, none of the new address, and should + // only have performed one DNS resolution + assertEquals(2, initialAddressConns.get()); + assertEquals(0, newAddressConns.get()); + assertEquals(1, mockHostResolver.resolutionCount()); + } + + @Test + public void testFailedConnectionToFirstAddressAfterReconnect() { + AddressChangeHostResolver mockHostResolver = new AddressChangeHostResolver(); + AtomicInteger initialAddressConns = new AtomicInteger(); + AtomicInteger newAddressConns = new AtomicInteger(); + MockSelector selector = new MockSelector(this.time, inetSocketAddress -> { + InetAddress inetAddress = inetSocketAddress.getAddress(); + if (initialAddresses.contains(inetAddress)) { + initialAddressConns.incrementAndGet(); + } else if (newAddresses.contains(inetAddress)) { + newAddressConns.incrementAndGet(); + } + // Refuse first connection attempt to the new addresses + return initialAddresses.contains(inetAddress) || newAddressConns.get() > 1; + }); + NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, + ClientDnsLookup.USE_ALL_DNS_IPS, time, false, new ApiVersions(), null, new LogContext(), mockHostResolver); + + // Connect to one the initial addresses, then change the addresses and disconnect + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + mockHostResolver.setUseNewAddresses(); + selector.serverDisconnect(node.idString()); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // First connection attempt to new addresses should fail + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertFalse(client.isReady(node, time.milliseconds())); + + // Second connection attempt to new addresses should succeed + time.sleep(reconnectBackoffMaxMsTest); + client.ready(node, time.milliseconds()); + time.sleep(connectionSetupTimeoutMaxMsTest); + client.poll(0, time.milliseconds()); + assertTrue(client.isReady(node, time.milliseconds())); + + // We should have tried to connect to two of the initial addresses, none of the new address, and should + // only have performed one DNS resolution Review comment: Yep, thanks for catching that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org