rpuch commented on code in PR #5345:
URL: https://github.com/apache/ignite-3/pull/5345#discussion_r1986733439


##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -271,17 +232,67 @@ private static Collection<NetworkInterface> 
getEligibleNetworkInterfaces() {
     public void close() {
         stopped = true;
 
-        for (CompletableFuture<Void> future : listenerFutures) {
-            future.cancel(true);
-        }
-
-        shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(listenerThreadPool, 10, TimeUnit.SECONDS);
     }
 
     @Override
     public void start() {
-        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
-            listenerFutures.add(runAsync(() -> 
listenOnInterface(networkInterface), threadPool));
-        }
+        listenerThreadPool.submit(() -> {
+            List<MulticastSocket> sockets = new ArrayList<>();
+
+            try {
+                for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+                    MulticastSocket socket = new 
MulticastSocket(multicastPort);
+                    configureSocket(socket, networkInterface, 
POLLING_TIMEOUT_MILLIS);
+                    socket.joinGroup(multicastSocketAddress, networkInterface);
+
+                    sockets.add(socket);
+                }
+
+                if (sockets.isEmpty()) {
+                    LOG.warn("No interfaces eligible for multicast found; 
listener not started.");
+                    return;
+                }
+
+                byte[] responseData = 
ByteUtils.toBytes(localAddressToAdvertise);
+                byte[] requestBuffer = new byte[REQUEST_MESSAGE.length];
+                while (!stopped) {
+                    for (MulticastSocket socket : sockets) {
+                        DatagramPacket requestPacket = new 
DatagramPacket(requestBuffer, requestBuffer.length);
+                        try {
+                            socket.receive(requestPacket);
+
+                            byte[] received = Arrays.copyOfRange(
+                                    requestPacket.getData(),
+                                    requestPacket.getOffset(),
+                                    requestPacket.getOffset() + 
requestPacket.getLength()
+                            );
+
+                            if (!Arrays.equals(received, REQUEST_MESSAGE)) {
+                                LOG.error("Received unexpected request on 
multicast socket");
+                                continue;
+                            }
+
+                            DatagramPacket responsePacket = new DatagramPacket(
+                                    responseData,
+                                    responseData.length,
+                                    requestPacket.getAddress(),
+                                    requestPacket.getPort()
+                            );
+
+                            socket.send(responsePacket);
+                        } catch (SocketTimeoutException ignored) {
+                            // Timeout to check another socket.
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                if (!stopped) {
+                    throw new IgniteInternalException(INTERNAL_ERR, "Error in 
multicast listener", e);
+                }
+            } finally {
+                sockets.forEach(MulticastSocket::close);

Review Comment:
   Let's close using `IgniteUtils#closeAll()`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -271,17 +232,67 @@ private static Collection<NetworkInterface> 
getEligibleNetworkInterfaces() {
     public void close() {
         stopped = true;
 
-        for (CompletableFuture<Void> future : listenerFutures) {
-            future.cancel(true);
-        }
-
-        shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(listenerThreadPool, 10, TimeUnit.SECONDS);
     }
 
     @Override
     public void start() {
-        for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
-            listenerFutures.add(runAsync(() -> 
listenOnInterface(networkInterface), threadPool));
-        }
+        listenerThreadPool.submit(() -> {
+            List<MulticastSocket> sockets = new ArrayList<>();
+
+            try {
+                for (NetworkInterface networkInterface : 
getEligibleNetworkInterfaces()) {
+                    MulticastSocket socket = new 
MulticastSocket(multicastPort);
+                    configureSocket(socket, networkInterface, 
POLLING_TIMEOUT_MILLIS);

Review Comment:
   Is 10ms enough for any network? How was it chosen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to