rpuch commented on code in PR #5345: URL: https://github.com/apache/ignite-3/pull/5345#discussion_r1986733439
########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -271,17 +232,67 @@ private static Collection<NetworkInterface> getEligibleNetworkInterfaces() { public void close() { stopped = true; - for (CompletableFuture<Void> future : listenerFutures) { - future.cancel(true); - } - - shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS); + shutdownAndAwaitTermination(listenerThreadPool, 10, TimeUnit.SECONDS); } @Override public void start() { - for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { - listenerFutures.add(runAsync(() -> listenOnInterface(networkInterface), threadPool)); - } + listenerThreadPool.submit(() -> { + List<MulticastSocket> sockets = new ArrayList<>(); + + try { + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + MulticastSocket socket = new MulticastSocket(multicastPort); + configureSocket(socket, networkInterface, POLLING_TIMEOUT_MILLIS); + socket.joinGroup(multicastSocketAddress, networkInterface); + + sockets.add(socket); + } + + if (sockets.isEmpty()) { + LOG.warn("No interfaces eligible for multicast found; listener not started."); + return; + } + + byte[] responseData = ByteUtils.toBytes(localAddressToAdvertise); + byte[] requestBuffer = new byte[REQUEST_MESSAGE.length]; + while (!stopped) { + for (MulticastSocket socket : sockets) { + DatagramPacket requestPacket = new DatagramPacket(requestBuffer, requestBuffer.length); + try { + socket.receive(requestPacket); + + byte[] received = Arrays.copyOfRange( + requestPacket.getData(), + requestPacket.getOffset(), + requestPacket.getOffset() + requestPacket.getLength() + ); + + if (!Arrays.equals(received, REQUEST_MESSAGE)) { + LOG.error("Received unexpected request on multicast socket"); + continue; + } + + DatagramPacket responsePacket = new DatagramPacket( + responseData, + responseData.length, + requestPacket.getAddress(), + requestPacket.getPort() + ); + + socket.send(responsePacket); + } catch (SocketTimeoutException ignored) { + // Timeout to check another socket. + } + } + } + } catch (Exception e) { + if (!stopped) { + throw new IgniteInternalException(INTERNAL_ERR, "Error in multicast listener", e); + } + } finally { + sockets.forEach(MulticastSocket::close); Review Comment: Let's close using `IgniteUtils#closeAll()` ########## modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java: ########## @@ -271,17 +232,67 @@ private static Collection<NetworkInterface> getEligibleNetworkInterfaces() { public void close() { stopped = true; - for (CompletableFuture<Void> future : listenerFutures) { - future.cancel(true); - } - - shutdownAndAwaitTermination(threadPool, 10, TimeUnit.SECONDS); + shutdownAndAwaitTermination(listenerThreadPool, 10, TimeUnit.SECONDS); } @Override public void start() { - for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { - listenerFutures.add(runAsync(() -> listenOnInterface(networkInterface), threadPool)); - } + listenerThreadPool.submit(() -> { + List<MulticastSocket> sockets = new ArrayList<>(); + + try { + for (NetworkInterface networkInterface : getEligibleNetworkInterfaces()) { + MulticastSocket socket = new MulticastSocket(multicastPort); + configureSocket(socket, networkInterface, POLLING_TIMEOUT_MILLIS); Review Comment: Is 10ms enough for any network? How was it chosen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org