rpuch commented on code in PR #5345:
URL: https://github.com/apache/ignite-3/pull/5345#discussion_r1982819493


##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes

Review Comment:
   'TCP discovery' looks like a term from AI2, we don't tend to use it in AI3. 
I'd suggest to rephrase this



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes

Review Comment:
   ```suggestion
    * <p>When TCP discovery starts, this finder sends multicast request and 
waits for some time when other nodes
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);
+
+        return result;
+    }
+
+    private Collection<NetworkAddress> discoverOnInterface(@Nullable 
NetworkInterface networkInterface) {

Review Comment:
   ```suggestion
       private Collection<NetworkAddress> findOnInterface(@Nullable 
NetworkInterface networkInterface) {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);
+
+        return result;
+    }
+
+    private Collection<NetworkAddress> discoverOnInterface(@Nullable 
NetworkInterface networkInterface) {
+        Set<NetworkAddress> discovered = new HashSet<>();
+        byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE];
+
+        try (MulticastSocket socket = new MulticastSocket(0)) {
+            configureSocket(socket, networkInterface);
+
+            for (int i = 0; i < REQ_ATTEMPTS; i++) {
+                DatagramPacket requestPacket = new 
DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length);
+                requestPacket.setSocketAddress(multicastSocketAddress);
+                socket.send(requestPacket);
+
+                waitForResponses(responseBuffer, socket, discovered);
+            }
+        } catch (Exception e) {
+            LOG.error("Error during discovery on interface: " + 
networkInterface, e);
+        }
+
+        return discovered;
+    }
+
+    private void waitForResponses(byte[] responseBuffer, MulticastSocket 
socket, Set<NetworkAddress> discovered) throws IOException {
+        long endTime = coarseCurrentTimeMillis() + resultWaitMillis;

Review Comment:
   Let's use normal `System.currentTimeMillis()`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/NodeFinderFactory.java:
##########
@@ -51,10 +53,22 @@ public static NodeFinder createNodeFinder(NodeFinderView 
nodeFinderConfiguration
                 return Arrays.stream(nodeFinderConfiguration.netClusterNodes())
                         .map(NetworkAddress::from)
                         .collect(collectingAndThen(toUnmodifiableList(), 
StaticNodeFinder::new));
+            case MULTICAST:
+                MulticastView multicastConfig = 
nodeFinderConfiguration.multicast();
 
+                MulticastNodeFinder multicastNodeFinder = new 
MulticastNodeFinder(
+                        multicastConfig.group(),
+                        multicastConfig.port(),
+                        multicastConfig.discoveryResultWaitMillis(),
+                        multicastConfig.ttl(),
+                        localAddress
+                );
+
+                multicastNodeFinder.start();

Review Comment:
   This method is just about creating a `NodeFinder`. Let's add `start()` 
method on `NodeFinder` and invoke it not here, but in the calling method after 
the `NodeFinder` has been created



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/NodeFinderConfigurationSchema.java:
##########
@@ -30,4 +31,7 @@ public class NodeFinderConfigurationSchema {
     /** Addresses of nodes in the cluster in a host:port format. This is a 
part of StaticNodeFinder configuration. */
     @Value(hasDefault = true)
     public final String[] netClusterNodes = new String[0];
+
+    @ConfigValue
+    public MulticastConfigurationSchema multicast;

Review Comment:
   Javadoc is missing



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/NodeFinderType.java:
##########
@@ -20,5 +20,7 @@
 /** NodeFinder type. */
 public enum NodeFinderType {
     /** Node finder with a preconfigured list of ip addresses. */
-    STATIC;
+    STATIC,
+    /** Uses multicast to discover nodes. */

Review Comment:
   ```suggestion
       /** Uses multicast to find nodes. */
   ```



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.scalecube;
+
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.MulticastNodeFinder;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMulticastNodeFinderTest extends IgniteAbstractTest {
+    private static final int INIT_PORT = 3344;
+
+    private static final String MULTICAST_GROUP = "224.0.0.1";
+
+    private final ClusterIdSupplier clusterIdSupplier = new 
ConstantClusterIdSupplier(UUID.randomUUID());
+
+    /** Created {@link ClusterService}s. Needed for resource management. */
+    private List<ClusterService> services;
+
+    private TestInfo testInfo;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    @AfterEach
+    void tearDown() {
+        assertThat(stopAsync(new ComponentContext(), services), 
willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {-1, 255})
+    void testFindNodes(int ttl) throws InterruptedException {
+        List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, 
INIT_PORT + 6);
+
+        services = addresses.stream()
+                .map(addr -> startNetwork(testInfo, addr, 
startMulticastNodeFinder(addr, ttl)))

Review Comment:
   Does the started `NodeFinder` ever get stopped?



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.scalecube;
+
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.MulticastNodeFinder;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMulticastNodeFinderTest extends IgniteAbstractTest {
+    private static final int INIT_PORT = 3344;
+
+    private static final String MULTICAST_GROUP = "224.0.0.1";
+
+    private final ClusterIdSupplier clusterIdSupplier = new 
ConstantClusterIdSupplier(UUID.randomUUID());
+
+    /** Created {@link ClusterService}s. Needed for resource management. */
+    private List<ClusterService> services;
+
+    private TestInfo testInfo;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    @AfterEach
+    void tearDown() {
+        assertThat(stopAsync(new ComponentContext(), services), 
willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {-1, 255})
+    void testFindNodes(int ttl) throws InterruptedException {
+        List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, 
INIT_PORT + 6);
+
+        services = addresses.stream()
+                .map(addr -> startNetwork(testInfo, addr, 
startMulticastNodeFinder(addr, ttl)))
+                .collect(Collectors.toCollection(ArrayList::new)); // ensure 
mutability
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, 5, 5_000), 
service.topologyService().localMember().toString()
+                    + ", topSize=" + 
service.topologyService().allMembers().size());
+        }
+
+        int idx0 = 0;
+        int idx1 = 2;
+
+        assertThat(services.get(idx0).stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        assertThat(services.get(idx1).stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        ClusterService svc0 = startNetwork(testInfo, addresses.get(idx0), 
startMulticastNodeFinder(addresses.get(idx0), ttl));
+        services.set(idx0, svc0);
+
+        ClusterService svc2 = startNetwork(testInfo, addresses.get(idx1), 
startMulticastNodeFinder(addresses.get(idx1), ttl));
+        services.set(idx1, svc2);
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, 5, 10_000), 
service.topologyService().localMember().toString()
+                    + ", topSize=" + 
service.topologyService().allMembers().size());
+        }
+    }
+
+    private static NodeFinder startMulticastNodeFinder(NetworkAddress addr, 
int ttl) {
+        MulticastNodeFinder finder = new MulticastNodeFinder(
+                MULTICAST_GROUP,
+                INIT_PORT,
+                500,
+                ttl,
+                new InetSocketAddress(addr.host(), addr.port())
+        );
+
+        finder.start();
+
+        return finder;
+    }
+
+    /**
+     * Creates a {@link ClusterService} using the given local address and the 
node finder.
+     *
+     * @param testInfo   Test info.
+     * @param addr       Node address.
+     * @param nodeFinder Node finder.
+     * @return Created Cluster Service.
+     */
+    private ClusterService startNetwork(TestInfo testInfo, NetworkAddress 
addr, NodeFinder nodeFinder) {

Review Comment:
   Do we really need to pass `TestInfo` via parameter? It's already accessible 
via field



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);

Review Comment:
   Let's use a thread factory (`NamedThreadFactory`) when creating the pool



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);

Review Comment:
   Does the message payload match its AI2 counterpart? If yes, let's change it 
a bit ("IGNT"?) to avoid confusion between nodes of AI2 and AI3 if they end up 
to be in the same network for some reason



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {

Review Comment:
   1. `join()` is uninterruptible; are you sure we need it and not `get()` 
which can be interrupted?
   2. I would suggest to use a timed version of `get()` to make sure we don't 
wait forever (as there is some kind of timeout already defined for obtaining 
node addresses)
   3. Are we expected to swallow the exception? If it's ok, then it should not 
be logged as an ERROR (probably not even as a WARN)



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */

Review Comment:
   These are futures, not tasks



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);
+
+        return result;
+    }
+
+    private Collection<NetworkAddress> discoverOnInterface(@Nullable 
NetworkInterface networkInterface) {
+        Set<NetworkAddress> discovered = new HashSet<>();
+        byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE];
+
+        try (MulticastSocket socket = new MulticastSocket(0)) {
+            configureSocket(socket, networkInterface);
+
+            for (int i = 0; i < REQ_ATTEMPTS; i++) {
+                DatagramPacket requestPacket = new 
DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length);
+                requestPacket.setSocketAddress(multicastSocketAddress);
+                socket.send(requestPacket);
+
+                waitForResponses(responseBuffer, socket, discovered);
+            }
+        } catch (Exception e) {
+            LOG.error("Error during discovery on interface: " + 
networkInterface, e);

Review Comment:
   Why is it swallowed? If it should be, why is it logged as an ERROR?



##########
modules/network-api/src/main/java/org/apache/ignite/internal/network/NodeFinder.java:
##########
@@ -17,17 +17,23 @@
 
 package org.apache.ignite.internal.network;
 
-import java.util.List;
+import java.util.Collection;
+import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.network.NetworkAddress;
 
 /**
  * Interface for services responsible for discovering the initial set of 
network cluster members.
  */
-public interface NodeFinder {
+public interface NodeFinder extends ManuallyCloseable {
     /**
      * Discovers the initial set of cluster members and returns their network 
addresses.
      *
      * @return addresses of initial cluster members.
      */
-    List<NetworkAddress> findNodes();
+    Collection<NetworkAddress> findNodes();

Review Comment:
   Why is this change needed?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);
+
+        return result;
+    }
+
+    private Collection<NetworkAddress> discoverOnInterface(@Nullable 
NetworkInterface networkInterface) {
+        Set<NetworkAddress> discovered = new HashSet<>();
+        byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE];
+
+        try (MulticastSocket socket = new MulticastSocket(0)) {
+            configureSocket(socket, networkInterface);
+
+            for (int i = 0; i < REQ_ATTEMPTS; i++) {
+                DatagramPacket requestPacket = new 
DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length);
+                requestPacket.setSocketAddress(multicastSocketAddress);
+                socket.send(requestPacket);
+
+                waitForResponses(responseBuffer, socket, discovered);
+            }
+        } catch (Exception e) {
+            LOG.error("Error during discovery on interface: " + 
networkInterface, e);
+        }
+
+        return discovered;
+    }
+
+    private void waitForResponses(byte[] responseBuffer, MulticastSocket 
socket, Set<NetworkAddress> discovered) throws IOException {
+        long endTime = coarseCurrentTimeMillis() + resultWaitMillis;
+        while (coarseCurrentTimeMillis() < endTime) {
+            DatagramPacket responsePacket = new DatagramPacket(responseBuffer, 
responseBuffer.length);
+
+            try {
+                socket.receive(responsePacket);
+                byte[] data = Arrays.copyOfRange(
+                        responsePacket.getData(),
+                        responsePacket.getOffset(),
+                        responsePacket.getOffset() + responsePacket.getLength()
+                );
+
+                InetSocketAddress address = ByteUtils.fromBytes(data);
+                if (!address.equals(localAddress)) {
+                    discovered.add(NetworkAddress.from(address));
+                }
+            } catch (SocketTimeoutException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    private void configureSocket(MulticastSocket socket, @Nullable 
NetworkInterface networkInterface) throws IOException {
+        socket.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true);
+
+        if (networkInterface != null) {
+            socket.setNetworkInterface(networkInterface);
+        }
+
+        socket.setSoTimeout(resultWaitMillis);
+
+        if (ttl != -1) {
+            socket.setTimeToLive(ttl);
+        }
+    }
+
+    /**
+     * Listens on a given network interface for multicast discovery requests 
and responds with this node's address.
+     *
+     * @param networkInterface The network interface to listen on.
+     */
+    private void listenOnInterface(NetworkInterface networkInterface) {
+        try (MulticastSocket socket = new MulticastSocket(multicastPort)) {
+            configureSocket(socket, networkInterface);
+            socket.joinGroup(multicastSocketAddress, networkInterface);
+
+            byte[] responseData = ByteUtils.toBytes(localAddress);
+            byte[] requestBuffer = new byte[REQUEST_MESSAGE.length];
+
+            while (!stopped) {
+                DatagramPacket requestPacket = new 
DatagramPacket(requestBuffer, requestBuffer.length);
+                try {
+                    socket.receive(requestPacket);
+
+                    byte[] received = Arrays.copyOfRange(
+                            requestPacket.getData(),
+                            requestPacket.getOffset(),
+                            requestPacket.getOffset() + 
requestPacket.getLength()
+                    );
+
+                    if (!Arrays.equals(received, REQUEST_MESSAGE)) {
+                        LOG.error("Got unexpected request on multicast 
socket");
+                        continue;
+                    }
+
+                    DatagramPacket responsePacket = new DatagramPacket(
+                            responseData,
+                            responseData.length,
+                            requestPacket.getAddress(),
+                            requestPacket.getPort()
+                    );
+
+                    socket.send(responsePacket);
+                } catch (SocketTimeoutException ignored) {
+                    // Timeout to check the running flag.
+                }
+            }
+        } catch (Exception e) {
+            if (!stopped) {
+                LOG.error("Error in multicast listener on interface: " + 
networkInterface, e);
+            } else {
+                LOG.info("Multicast listener shutting down on interface: " + 
networkInterface);
+            }
+        }
+    }
+
+    /**
+     * Returns a collection of eligible network interfaces that are up, 
non‑loopback, and support multicast.
+     *
+     * @return Collection of eligible network interfaces.
+     */
+    private static Collection<NetworkInterface> getEligibleNetworkInterfaces() 
{
+        Set<NetworkInterface> eligible = new HashSet<>();
+        try {
+            Enumeration<NetworkInterface> interfaces = 
NetworkInterface.getNetworkInterfaces();
+            while (interfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = interfaces.nextElement();
+                if (networkInterface.isUp() && !networkInterface.isLoopback() 
&& networkInterface.supportsMulticast()) {
+                    eligible.add(networkInterface);
+                }
+            }
+        } catch (Exception e) {

Review Comment:
   What kind of an exception do we anticipate here? Why not just let it fly to 
the caller?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);

Review Comment:
   ```suggestion
           LOG.info("Found nodes: {}", result);
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";
+
+    @Value(hasDefault = true)
+    @Range(min = 0, max = 65535)
+    public final int port = 47400;
+
+    @Value(hasDefault = true)
+    @Range(min = 0)
+    public final int discoveryResultWaitMillis = 500;

Review Comment:
   Maybe we should discuss whether we should use the term 'discovery'



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;
+    private final ExecutorService threadPool;
+
+    /** Flag to control running state of listener tasks. */
+    private volatile boolean stopped = false;
+
+    /** Listener tasks for each eligible interface. */
+    private final List<CompletableFuture<Void>> listenerFutures = new 
ArrayList<>();
+
+    /**
+     * Constructs a new multicast node finder.
+     *
+     * @param multicastGroup Multicast group.
+     * @param multicastPort Multicast port.
+     * @param resultWaitMillis Wait time for responses.
+     * @param ttl Time-to-live for multicast packets.
+     * @param localAddress Local node address.
+     */
+    public MulticastNodeFinder(
+            String multicastGroup,
+            int multicastPort,
+            int resultWaitMillis,
+            int ttl,
+            InetSocketAddress localAddress
+    ) {
+        this.multicastSocketAddress = new InetSocketAddress(multicastGroup, 
multicastPort);
+        this.multicastPort = multicastPort;
+        this.resultWaitMillis = resultWaitMillis;
+        this.ttl = ttl;
+        this.localAddress = localAddress;
+        this.threadPool = Executors.newFixedThreadPool(4);
+    }
+
+    @Override
+    public Collection<NetworkAddress> findNodes() {
+        Set<NetworkAddress> result = new HashSet<>();
+        List<CompletableFuture<Collection<NetworkAddress>>> discoveryFutures = 
new ArrayList<>();
+
+        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+            discoveryFutures.add(supplyAsync(() -> 
discoverOnInterface(networkInterface), threadPool));
+        }
+
+        for (CompletableFuture<Collection<NetworkAddress>> future : 
discoveryFutures) {
+            try {
+                result.addAll(future.join());
+            } catch (Exception e) {
+                LOG.error("Error during node discovery", e);
+            }
+        }
+
+        if (result.isEmpty()) {
+            LOG.warn("No nodes discovered on interfaces, using unbound 
multicast socket");
+            result.addAll(discoverOnInterface(null));
+        }
+
+        LOG.info("Discovered nodes: {}", result);
+
+        return result;
+    }
+
+    private Collection<NetworkAddress> discoverOnInterface(@Nullable 
NetworkInterface networkInterface) {
+        Set<NetworkAddress> discovered = new HashSet<>();
+        byte[] responseBuffer = new byte[RECEIVE_BUFFER_SIZE];
+
+        try (MulticastSocket socket = new MulticastSocket(0)) {
+            configureSocket(socket, networkInterface);
+
+            for (int i = 0; i < REQ_ATTEMPTS; i++) {
+                DatagramPacket requestPacket = new 
DatagramPacket(REQUEST_MESSAGE, REQUEST_MESSAGE.length);
+                requestPacket.setSocketAddress(multicastSocketAddress);
+                socket.send(requestPacket);
+
+                waitForResponses(responseBuffer, socket, discovered);
+            }
+        } catch (Exception e) {
+            LOG.error("Error during discovery on interface: " + 
networkInterface, e);
+        }
+
+        return discovered;
+    }
+
+    private void waitForResponses(byte[] responseBuffer, MulticastSocket 
socket, Set<NetworkAddress> discovered) throws IOException {
+        long endTime = coarseCurrentTimeMillis() + resultWaitMillis;
+        while (coarseCurrentTimeMillis() < endTime) {
+            DatagramPacket responsePacket = new DatagramPacket(responseBuffer, 
responseBuffer.length);
+
+            try {
+                socket.receive(responsePacket);
+                byte[] data = Arrays.copyOfRange(
+                        responsePacket.getData(),
+                        responsePacket.getOffset(),
+                        responsePacket.getOffset() + responsePacket.getLength()
+                );
+
+                InetSocketAddress address = ByteUtils.fromBytes(data);
+                if (!address.equals(localAddress)) {
+                    discovered.add(NetworkAddress.from(address));
+                }
+            } catch (SocketTimeoutException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    private void configureSocket(MulticastSocket socket, @Nullable 
NetworkInterface networkInterface) throws IOException {
+        socket.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true);
+
+        if (networkInterface != null) {
+            socket.setNetworkInterface(networkInterface);
+        }
+
+        socket.setSoTimeout(resultWaitMillis);
+
+        if (ttl != -1) {

Review Comment:
   Please extract -1 as a constant with a name like `UNBOUNDED_TTL`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";
+
+    @Value(hasDefault = true)
+    @Range(min = 0, max = 65535)
+    public final int port = 47400;

Review Comment:
   How was this default chosen? If it's same as in AI2, shouldn't we choose a 
different default to avoid clashes?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Multicast-based IP finder.
+ *
+ * <p>When TCP discovery starts this finder sends multicast request and waits 
for some time when other nodes
+ * reply to this request with messages containing their addresses.
+ */
+public class MulticastNodeFinder implements NodeFinder {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MulticastNodeFinder.class);
+
+    /** Discovery request message. */
+    private static final byte[] REQUEST_MESSAGE = "IGNI".getBytes(UTF_8);
+
+    /** Buffer size for receiving responses. */
+    private static final int RECEIVE_BUFFER_SIZE = 1024;
+
+    private static final int REQ_ATTEMPTS = 2;
+
+    private final InetSocketAddress multicastSocketAddress;
+    private final int multicastPort;
+    private final int resultWaitMillis;
+    private final int ttl;
+
+    private final InetSocketAddress localAddress;

Review Comment:
   ```suggestion
       private final InetSocketAddress localAddressToAdvertise;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {

Review Comment:
   Please add javadocs to all config properties defined here



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";

Review Comment:
   How was this default chosen? It seems to be the default multicast address 
for AI2, but should we reuse it in AI3?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";
+
+    @Value(hasDefault = true)
+    @Range(min = 0, max = 65535)

Review Comment:
   0 is probably not a valid port number here



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.scalecube;
+
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.MulticastNodeFinder;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMulticastNodeFinderTest extends IgniteAbstractTest {
+    private static final int INIT_PORT = 3344;
+
+    private static final String MULTICAST_GROUP = "224.0.0.1";
+
+    private final ClusterIdSupplier clusterIdSupplier = new 
ConstantClusterIdSupplier(UUID.randomUUID());
+
+    /** Created {@link ClusterService}s. Needed for resource management. */
+    private List<ClusterService> services;
+
+    private TestInfo testInfo;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        this.testInfo = testInfo;
+    }
+
+    @AfterEach
+    void tearDown() {
+        assertThat(stopAsync(new ComponentContext(), services), 
willCompleteSuccessfully());
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {-1, 255})
+    void testFindNodes(int ttl) throws InterruptedException {
+        List<NetworkAddress> addresses = findLocalAddresses(INIT_PORT + 1, 
INIT_PORT + 6);
+
+        services = addresses.stream()
+                .map(addr -> startNetwork(testInfo, addr, 
startMulticastNodeFinder(addr, ttl)))
+                .collect(Collectors.toCollection(ArrayList::new)); // ensure 
mutability

Review Comment:
   Let's import `toCollection()` statically



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";
+
+    @Value(hasDefault = true)
+    @Range(min = 0, max = 65535)
+    public final int port = 47400;
+
+    @Value(hasDefault = true)
+    @Range(min = 0)
+    public final int discoveryResultWaitMillis = 500;
+
+    @Value(hasDefault = true)
+    @Range(min = -1, max = 255)
+    public final int ttl = -1;

Review Comment:
   Please use the constant for -1 (defined elsewhere)



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastAddressValidator.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.apache.ignite.configuration.validation.ValidationContext;
+import org.apache.ignite.configuration.validation.ValidationIssue;
+import org.apache.ignite.configuration.validation.Validator;
+
+/** Validates that a value is a correct multicast address. */
+public class MulticastAddressValidator implements Validator<MulticastAddress, 
String> {

Review Comment:
   Please add a (unit?) test for this validator



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItMulticastNodeFinderTest.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.scalecube;
+
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.MulticastNodeFinder;
+import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMulticastNodeFinderTest extends IgniteAbstractTest {
+    private static final int INIT_PORT = 3344;

Review Comment:
   This port is used both as a multicast port and as a base for 'normal' 
server-to-server protocol port, which causes confusion. Let's introduce a 
separate constant `MULTICAST_PORT` with a distinct value



##########
modules/network/src/main/java/org/apache/ignite/internal/network/configuration/MulticastConfigurationSchema.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.configuration;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Range;
+
+/** Configuration for multicast node finder. */
+@Config
+public class MulticastConfigurationSchema {
+    @Value(hasDefault = true)
+    @MulticastAddress
+    public final String group = "228.1.2.4";
+
+    @Value(hasDefault = true)
+    @Range(min = 0, max = 65535)
+    public final int port = 47400;
+
+    @Value(hasDefault = true)
+    @Range(min = 0)
+    public final int discoveryResultWaitMillis = 500;

Review Comment:
   Another thing is that using `Millis` suffix in configuration settings does 
not agree with the idea we had to support time units being specified in the 
configuration



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to