rpuch commented on code in PR #5345: URL: https://github.com/apache/ignite-3/pull/5345#discussion_r1982819493
########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes Review Comment: 'TCP discovery' looks like a term from AI2, we don't tend to use it in AI3. I'd suggest to rephrase this ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes Review Comment: ```suggestion * <p>When TCP discovery starts, this finder sends multicast request and waits for some time when other nodes ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); + + return result; + } + + private Collection<NetworkAddress> discoverOnInterface(@Nullable NetworkInterface networkInterface) { Review Comment: ```suggestion private Collection<NetworkAddress> findOnInterface(@Nullable NetworkInterface networkInterface) { ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); + + return result; + } + + private Collection<NetworkAddress> discoverOnInterface(@Nullable NetworkInterface networkInterface) { + Set<NetworkAddress> discovered = new HashSet<>(); + byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE]; + + try (MulticastSocket socket = new MulticastSocket(0)) { + configureSocket(socket, networkInterface); + + for (int i = 0; i < REQ_ATTEMPTS; i++) { + DatagramPacket requestPacket = new DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length); + requestPacket.setSocketAddress(multicastSocketAddress); + socket.send(requestPacket); + + waitForResponses(responseBuffer, socket, discovered); + } + } catch (Exception e) { + LOG.error("Error during discovery on interface: " + networkInterface, e); + } + + return discovered; + } + + private void waitForResponses(byte[] responseBuffer, MulticastSocket socket, Set<NetworkAddress> discovered) throws IOException { + long endTime = coarseCurrentTimeMillis() + resultWaitMillis; Review Comment: Let's use normal `System.currentTimeMillis()` ########## modules/network/src/main/java/org/apache/ignite/internal/network/NodeFinderFactory.java: ########## @@ -51,10 +53,22 @@ public static NodeFinder createNodeFinder(NodeFinderView nodeFinderConfiguration return Arrays.stream(nodeFinderConfiguration.netClusterNodes()) .map(NetworkAddress::from) .collect(collectingAndThen(toUnmodifiableList(), StaticNodeFinder::new)); + case MULTICAST: + MulticastView multicastConfig = nodeFinderConfiguration.multicast(); + MulticastNodeFinder multicastNodeFinder = new MulticastNodeFinder( + multicastConfig.group(), + multicastConfig.port(), + multicastConfig.discoveryResultWaitMillis(), + multicastConfig.ttl(), + localAddress + ); + + multicastNodeFinder.start(); Review Comment: This method is just about creating a `NodeFinder`. Let's add `start()` method on `NodeFinder` and invoke it not here, but in the calling method after the `NodeFinder` has been created ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/NodeFinderConfigurationSchema.java: ########## @@ -30,4 +31,7 @@ public class NodeFinderConfigurationSchema { /** Addresses of nodes in the cluster in a host:port format. This is a part of StaticNodeFinder configuration. */ @Value(hasDefault = true) public final String[] netClusterNodes = new String[0]; + + @ConfigValue + public MulticastConfigurationSchema multicast; Review Comment: Javadoc is missing ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/NodeFinderType.java: ########## @@ -20,5 +20,7 @@ /** NodeFinder type. */ public enum NodeFinderType { /** Node finder with a preconfigured list of ip addresses. */ - STATIC; + STATIC, + /** Uses multicast to discover nodes. */ Review Comment: ```suggestion /** Uses multicast to find nodes. */ ``` ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.scalecube; + +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.network.ClusterIdSupplier; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.ConstantClusterIdSupplier; +import org.apache.ignite.internal.network.MulticastNodeFinder; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(ExecutorServiceExtension.class) +class ItMulticastNodeFinderTest extends IgniteAbstractTest { + private static final int INIT_PORT = 3344; + + private static final String MULTICAST_GROUP = "224.0.0.1"; + + private final ClusterIdSupplier clusterIdSupplier = new ConstantClusterIdSupplier(UUID.randomUUID()); + + /** Created {@link ClusterService}s. Needed for resource management. */ + private List<ClusterService> services; + + private TestInfo testInfo; + + @BeforeEach + void setUp(TestInfo testInfo) { + this.testInfo = testInfo; + } + + @AfterEach + void tearDown() { + assertThat(stopAsync(new ComponentContext(), services), willCompleteSuccessfully()); + } + + @ParameterizedTest + @ValueSource(ints = {-1, 255}) + void testFindNodes(int ttl) throws InterruptedException { + List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, INIT_PORT + 6); + + services = addresses.stream() + .map(addr -> startNetwork(testInfo, addr, startMulticastNodeFinder(addr, ttl))) Review Comment: Does the started `NodeFinder` ever get stopped? ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.scalecube; + +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.network.ClusterIdSupplier; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.ConstantClusterIdSupplier; +import org.apache.ignite.internal.network.MulticastNodeFinder; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(ExecutorServiceExtension.class) +class ItMulticastNodeFinderTest extends IgniteAbstractTest { + private static final int INIT_PORT = 3344; + + private static final String MULTICAST_GROUP = "224.0.0.1"; + + private final ClusterIdSupplier clusterIdSupplier = new ConstantClusterIdSupplier(UUID.randomUUID()); + + /** Created {@link ClusterService}s. Needed for resource management. */ + private List<ClusterService> services; + + private TestInfo testInfo; + + @BeforeEach + void setUp(TestInfo testInfo) { + this.testInfo = testInfo; + } + + @AfterEach + void tearDown() { + assertThat(stopAsync(new ComponentContext(), services), willCompleteSuccessfully()); + } + + @ParameterizedTest + @ValueSource(ints = {-1, 255}) + void testFindNodes(int ttl) throws InterruptedException { + List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, INIT_PORT + 6); + + services = addresses.stream() + .map(addr -> startNetwork(testInfo, addr, startMulticastNodeFinder(addr, ttl))) + .collect(Collectors.toCollection(ArrayList::new)); // ensure mutability + + for (ClusterService service : services) { + assertTrue(waitForTopology(service, 5, 5_000), service.topologyService().localMember().toString() + + ", topSize=" + service.topologyService().allMembers().size()); + } + + int idx0 = 0; + int idx1 = 2; + + assertThat(services.get(idx0).stopAsync(new ComponentContext()), willCompleteSuccessfully()); + + assertThat(services.get(idx1).stopAsync(new ComponentContext()), willCompleteSuccessfully()); + + ClusterService svc0 = startNetwork(testInfo, addresses.get(idx0), startMulticastNodeFinder(addresses.get(idx0), ttl)); + services.set(idx0, svc0); + + ClusterService svc2 = startNetwork(testInfo, addresses.get(idx1), startMulticastNodeFinder(addresses.get(idx1), ttl)); + services.set(idx1, svc2); + + for (ClusterService service : services) { + assertTrue(waitForTopology(service, 5, 10_000), service.topologyService().localMember().toString() + + ", topSize=" + service.topologyService().allMembers().size()); + } + } + + private static NodeFinder startMulticastNodeFinder(NetworkAddress addr, int ttl) { + MulticastNodeFinder finder = new MulticastNodeFinder( + MULTICAST_GROUP, + INIT_PORT, + 500, + ttl, + new InetSocketAddress(addr.host(), addr.port()) + ); + + finder.start(); + + return finder; + } + + /** + * Creates a {@link ClusterService} using the given local address and the node finder. + * + * @param testInfo Test info. + * @param addr Node address. + * @param nodeFinder Node finder. + * @return Created Cluster Service. + */ + private ClusterService startNetwork(TestInfo testInfo, NetworkAddress addr, NodeFinder nodeFinder) { Review Comment: Do we really need to pass `TestInfo` via parameter? It's already accessible via field ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); Review Comment: Let's use a thread factory (`NamedThreadFactory`) when creating the pool ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); Review Comment: Does the message payload match its AI2 counterpart? If yes, let's change it a bit ("IGNT"?) to avoid confusion between nodes of AI2 and AI3 if they end up to be in the same network for some reason ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { Review Comment: 1. `join()` is uninterruptible; are you sure we need it and not `get()` which can be interrupted? 2. I would suggest to use a timed version of `get()` to make sure we don't wait forever (as there is some kind of timeout already defined for obtaining node addresses) 3. Are we expected to swallow the exception? If it's ok, then it should not be logged as an ERROR (probably not even as a WARN) ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ Review Comment: These are futures, not tasks ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); + + return result; + } + + private Collection<NetworkAddress> discoverOnInterface(@Nullable NetworkInterface networkInterface) { + Set<NetworkAddress> discovered = new HashSet<>(); + byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE]; + + try (MulticastSocket socket = new MulticastSocket(0)) { + configureSocket(socket, networkInterface); + + for (int i = 0; i < REQ_ATTEMPTS; i++) { + DatagramPacket requestPacket = new DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length); + requestPacket.setSocketAddress(multicastSocketAddress); + socket.send(requestPacket); + + waitForResponses(responseBuffer, socket, discovered); + } + } catch (Exception e) { + LOG.error("Error during discovery on interface: " + networkInterface, e); Review Comment: Why is it swallowed? If it should be, why is it logged as an ERROR? ########## modules/network-api/src/main/java/org/apache/ignite/internal/network/NodeFinder.java: ########## @@ -17,17 +17,23 @@ package org.apache.ignite.internal.network; -import java.util.List; +import java.util.Collection; +import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.network.NetworkAddress; /** * Interface for services responsible for discovering the initial set of network cluster members. */ -public interface NodeFinder { +public interface NodeFinder extends ManuallyCloseable { /** * Discovers the initial set of cluster members and returns their network addresses. * * @return addresses of initial cluster members. */ - List<NetworkAddress> findNodes(); + Collection<NetworkAddress> findNodes(); Review Comment: Why is this change needed? ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); + + return result; + } + + private Collection<NetworkAddress> discoverOnInterface(@Nullable NetworkInterface networkInterface) { + Set<NetworkAddress> discovered = new HashSet<>(); + byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE]; + + try (MulticastSocket socket = new MulticastSocket(0)) { + configureSocket(socket, networkInterface); + + for (int i = 0; i < REQ_ATTEMPTS; i++) { + DatagramPacket requestPacket = new DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length); + requestPacket.setSocketAddress(multicastSocketAddress); + socket.send(requestPacket); + + waitForResponses(responseBuffer, socket, discovered); + } + } catch (Exception e) { + LOG.error("Error during discovery on interface: " + networkInterface, e); + } + + return discovered; + } + + private void waitForResponses(byte[] responseBuffer, MulticastSocket socket, Set<NetworkAddress> discovered) throws IOException { + long endTime = coarseCurrentTimeMillis() + resultWaitMillis; + while (coarseCurrentTimeMillis() < endTime) { + DatagramPacket responsePacket = new DatagramPacket(responseBuffer, responseBuffer.length); + + try { + socket.receive(responsePacket); + byte[] data = Arrays.copyOfRange( + responsePacket.getData(), + responsePacket.getOffset(), + responsePacket.getOffset() + responsePacket.getLength() + ); + + InetSocketAddress address = ByteUtils.fromBytes(data); + if (!address.equals(localAddress)) { + discovered.add(NetworkAddress.from(address)); + } + } catch (SocketTimeoutException ignored) { + // No-op. + } + } + } + + private void configureSocket(MulticastSocket socket, @Nullable NetworkInterface networkInterface) throws IOException { + socket.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true); + + if (networkInterface != null) { + socket.setNetworkInterface(networkInterface); + } + + socket.setSoTimeout(resultWaitMillis); + + if (ttl != -1) { + socket.setTimeToLive(ttl); + } + } + + /** + * Listens on a given network interface for multicast discovery requests and responds with this node's address. + * + * @param networkInterface The network interface to listen on. + */ + private void listenOnInterface(NetworkInterface networkInterface) { + try (MulticastSocket socket = new MulticastSocket(multicastPort)) { + configureSocket(socket, networkInterface); + socket.joinGroup(multicastSocketAddress, networkInterface); + + byte[] responseData = ByteUtils.toBytes(localAddress); + byte[] requestBuffer = new byte[REQUEST_MESSAGE.length]; + + while (!stopped) { + DatagramPacket requestPacket = new DatagramPacket(requestBuffer, requestBuffer.length); + try { + socket.receive(requestPacket); + + byte[] received = Arrays.copyOfRange( + requestPacket.getData(), + requestPacket.getOffset(), + requestPacket.getOffset() + requestPacket.getLength() + ); + + if (!Arrays.equals(received, REQUEST_MESSAGE)) { + LOG.error("Got unexpected request on multicast socket"); + continue; + } + + DatagramPacket responsePacket = new DatagramPacket( + responseData, + responseData.length, + requestPacket.getAddress(), + requestPacket.getPort() + ); + + socket.send(responsePacket); + } catch (SocketTimeoutException ignored) { + // Timeout to check the running flag. + } + } + } catch (Exception e) { + if (!stopped) { + LOG.error("Error in multicast listener on interface: " + networkInterface, e); + } else { + LOG.info("Multicast listener shutting down on interface: " + networkInterface); + } + } + } + + /** + * Returns a collection of eligible network interfaces that are up, non‑loopback, and support multicast. + * + * @return Collection of eligible network interfaces. + */ + private static Collection<NetworkInterface> getEligibleNetworkInterfaces() { + Set<NetworkInterface> eligible = new HashSet<>(); + try { + Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface networkInterface = interfaces.nextElement(); + if (networkInterface.isUp() && !networkInterface.isLoopback() && networkInterface.supportsMulticast()) { + eligible.add(networkInterface); + } + } + } catch (Exception e) { Review Comment: What kind of an exception do we anticipate here? Why not just let it fly to the caller? ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); Review Comment: ```suggestion LOG.info("Found nodes: {}", result); ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; + + @Value(hasDefault = true) + @Range(min = 0, max = 65535) + public final int port = 47400; + + @Value(hasDefault = true) + @Range(min = 0) + public final int discoveryResultWaitMillis = 500; Review Comment: Maybe we should discuss whether we should use the term 'discovery' ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; + private final ExecutorService threadPool; + + /** Flag to control running state of listener tasks. */ + private volatile boolean stopped = false; + + /** Listener tasks for each eligible interface. */ + private final List<CompletableFuture<Void>> listenerFutures = new ArrayList<>(); + + /** + * Constructs a new multicast node finder. + * + * @param multicastGroup Multicast group. + * @param multicastPort Multicast port. + * @param resultWaitMillis Wait time for responses. + * @param ttl Time-to-live for multicast packets. + * @param localAddress Local node address. + */ + public MulticastNodeFinder( + String multicastGroup, + int multicastPort, + int resultWaitMillis, + int ttl, + InetSocketAddress localAddress + ) { + this.multicastSocketAddress = new InetSocketAddress(multicastGroup, multicastPort); + this.multicastPort = multicastPort; + this.resultWaitMillis = resultWaitMillis; + this.ttl = ttl; + this.localAddress = localAddress; + this.threadPool = Executors.newFixedThreadPool(4); + } + + @Override + public Collection<NetworkAddress> findNodes() { + Set<NetworkAddress> result = new HashSet<>(); + List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = new ArrayList<>(); + + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + discoveryFutures.add(supplyAsync(() -> discoverOnInterface(networkInterface), threadPool)); + } + + for (CompletableFuture<Collection<NetworkAddress>> future : discoveryFutures) { + try { + result.addAll(future.join()); + } catch (Exception e) { + LOG.error("Error during node discovery", e); + } + } + + if (result.isEmpty()) { + LOG.warn("No nodes discovered on interfaces, using unbound multicast socket"); + result.addAll(discoverOnInterface(null)); + } + + LOG.info("Discovered nodes: {}", result); + + return result; + } + + private Collection<NetworkAddress> discoverOnInterface(@Nullable NetworkInterface networkInterface) { + Set<NetworkAddress> discovered = new HashSet<>(); + byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE]; + + try (MulticastSocket socket = new MulticastSocket(0)) { + configureSocket(socket, networkInterface); + + for (int i = 0; i < REQ_ATTEMPTS; i++) { + DatagramPacket requestPacket = new DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length); + requestPacket.setSocketAddress(multicastSocketAddress); + socket.send(requestPacket); + + waitForResponses(responseBuffer, socket, discovered); + } + } catch (Exception e) { + LOG.error("Error during discovery on interface: " + networkInterface, e); + } + + return discovered; + } + + private void waitForResponses(byte[] responseBuffer, MulticastSocket socket, Set<NetworkAddress> discovered) throws IOException { + long endTime = coarseCurrentTimeMillis() + resultWaitMillis; + while (coarseCurrentTimeMillis() < endTime) { + DatagramPacket responsePacket = new DatagramPacket(responseBuffer, responseBuffer.length); + + try { + socket.receive(responsePacket); + byte[] data = Arrays.copyOfRange( + responsePacket.getData(), + responsePacket.getOffset(), + responsePacket.getOffset() + responsePacket.getLength() + ); + + InetSocketAddress address = ByteUtils.fromBytes(data); + if (!address.equals(localAddress)) { + discovered.add(NetworkAddress.from(address)); + } + } catch (SocketTimeoutException ignored) { + // No-op. + } + } + } + + private void configureSocket(MulticastSocket socket, @Nullable NetworkInterface networkInterface) throws IOException { + socket.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true); + + if (networkInterface != null) { + socket.setNetworkInterface(networkInterface); + } + + socket.setSoTimeout(resultWaitMillis); + + if (ttl != -1) { Review Comment: Please extract -1 as a constant with a name like `UNBOUNDED_TTL` ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; + + @Value(hasDefault = true) + @Range(min = 0, max = 65535) + public final int port = 47400; Review Comment: How was this default chosen? If it's same as in AI2, shouldn't we choose a different default to avoid clashes? ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.net.StandardSocketOptions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.NetworkAddress; +import org.jetbrains.annotations.Nullable; + +/** + * Multicast-based IP finder. + * + * <p>When TCP discovery starts this finder sends multicast request and waits for some time when other nodes + * reply to this request with messages containing their addresses. + */ +public class MulticastNodeFinder implements NodeFinder { + private static final IgniteLogger LOG = Loggers.forClass(MulticastNodeFinder.class); + + /** Discovery request message. */ + private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8); + + /** Buffer size for receiving responses. */ + private static final int RECEIVE_BUFFER_SIZE = 1024; + + private static final int REQ_ATTEMPTS = 2; + + private final InetSocketAddress multicastSocketAddress; + private final int multicastPort; + private final int resultWaitMillis; + private final int ttl; + + private final InetSocketAddress localAddress; Review Comment: ```suggestion private final InetSocketAddress localAddressToAdvertise; ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { Review Comment: Please add javadocs to all config properties defined here ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; Review Comment: How was this default chosen? It seems to be the default multicast address for AI2, but should we reuse it in AI3? ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; + + @Value(hasDefault = true) + @Range(min = 0, max = 65535) Review Comment: 0 is probably not a valid port number here ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.scalecube; + +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.network.ClusterIdSupplier; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.ConstantClusterIdSupplier; +import org.apache.ignite.internal.network.MulticastNodeFinder; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(ExecutorServiceExtension.class) +class ItMulticastNodeFinderTest extends IgniteAbstractTest { + private static final int INIT_PORT = 3344; + + private static final String MULTICAST_GROUP = "224.0.0.1"; + + private final ClusterIdSupplier clusterIdSupplier = new ConstantClusterIdSupplier(UUID.randomUUID()); + + /** Created {@link ClusterService}s. Needed for resource management. */ + private List<ClusterService> services; + + private TestInfo testInfo; + + @BeforeEach + void setUp(TestInfo testInfo) { + this.testInfo = testInfo; + } + + @AfterEach + void tearDown() { + assertThat(stopAsync(new ComponentContext(), services), willCompleteSuccessfully()); + } + + @ParameterizedTest + @ValueSource(ints = {-1, 255}) + void testFindNodes(int ttl) throws InterruptedException { + List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, INIT_PORT + 6); + + services = addresses.stream() + .map(addr -> startNetwork(testInfo, addr, startMulticastNodeFinder(addr, ttl))) + .collect(Collectors.toCollection(ArrayList::new)); // ensure mutability Review Comment: Let's import `toCollection()` statically ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; + + @Value(hasDefault = true) + @Range(min = 0, max = 65535) + public final int port = 47400; + + @Value(hasDefault = true) + @Range(min = 0) + public final int discoveryResultWaitMillis = 500; + + @Value(hasDefault = true) + @Range(min = -1, max = 255) + public final int ttl = -1; Review Comment: Please use the constant for -1 (defined elsewhere) ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastAddressValidator.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.apache.ignite.configuration.validation.ValidationContext; +import org.apache.ignite.configuration.validation.ValidationIssue; +import org.apache.ignite.configuration.validation.Validator; + +/** Validates that a value is a correct multicast address. */ +public class MulticastAddressValidator implements Validator<MulticastAddress, String> { Review Comment: Please add a (unit?) test for this validator ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.scalecube; + +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService; +import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.network.ClusterIdSupplier; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.ConstantClusterIdSupplier; +import org.apache.ignite.internal.network.MulticastNodeFinder; +import org.apache.ignite.internal.network.NodeFinder; +import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.network.NetworkAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@ExtendWith(ExecutorServiceExtension.class) +class ItMulticastNodeFinderTest extends IgniteAbstractTest { + private static final int INIT_PORT = 3344; Review Comment: This port is used both as a multicast port and as a base for 'normal' server-to-server protocol port, which causes confusion. Let's introduce a separate constant `MULTICAST_PORT` with a distinct value ########## modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.configuration; + +import org.apache.ignite.configuration.annotation.Config; +import org.apache.ignite.configuration.annotation.Value; +import org.apache.ignite.configuration.validation.Range; + +/** Configuration for multicast node finder. */ +@Config +public class MulticastConfigurationSchema { + @Value(hasDefault = true) + @MulticastAddress + public final String group = "228.1.2.4"; + + @Value(hasDefault = true) + @Range(min = 0, max = 65535) + public final int port = 47400; + + @Value(hasDefault = true) + @Range(min = 0) + public final int discoveryResultWaitMillis = 500; Review Comment: Another thing is that using `Millis` suffix in configuration settings does not agree with the idea we had to support time units being specified in the configuration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org