rpuch commented on code in PR #2283: URL: https://github.com/apache/ignite-3/pull/2283#discussion_r1260955770
########## modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java: ########## @@ -337,6 +338,41 @@ public <U> CompletableFuture<U> thenComposeToCompletable(Function<? super T, ? e } } + /** + * Adds a mapping function that gets executed as soon as this future gets completed for any reason. The function will accept both result + * and exception and return a future with the result of the function's execution. + * + * @param mapper The function to use to compute the value of the returned CompletionStage. + * @return The new CompletionStage. Review Comment: ```suggestion * @param mapper The function to use to compute the value of the returned OrderingFuture. * @return The new OrderingFuture. ``` ########## modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java: ########## @@ -891,4 +894,40 @@ public boolean matches(Object o) { return predicate.test((DataT) o); } } + + /** + * Creates a matcher that tests if a CompletableFuture has completed successfully. + * + * @param <T> the type of the CompletableFuture + * @return a matcher for a successfully completed CompletableFuture + */ + public static <T> Matcher<CompletableFuture<T>> completedSuccessfully() { Review Comment: How about moving this to `CompletableFutureMatcher`? ########## modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java: ########## @@ -355,25 +357,44 @@ public void nodeCannotReuseOldId(TestInfo testInfo) throws Exception { } private void knockOutNode(String outcastName) throws InterruptedException { - CountDownLatch disappeared = new CountDownLatch(1); + CountDownLatch disappeared = new CountDownLatch(2); - testCluster.members.get(0).topologyService().addEventHandler(new TopologyEventHandler() { + TopologyEventHandler disappearListener = new TopologyEventHandler() { @Override public void onDisappeared(ClusterNode member) { if (Objects.equals(member.name(), outcastName)) { disappeared.countDown(); } } - }); + }; - testCluster.members.stream() + List<ClusterService> notOutcasts = testCluster.members.stream() .filter(service -> !outcastName.equals(service.nodeName())) - .forEach(service -> { - DefaultMessagingService messagingService = (DefaultMessagingService) service.messagingService(); - messagingService.dropMessages((recipientConsistentId, message) -> outcastName.equals(recipientConsistentId)); - }); + .collect(Collectors.toList()); Review Comment: Let's statically import `toList()` ########## modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java: ########## @@ -355,25 +357,44 @@ public void nodeCannotReuseOldId(TestInfo testInfo) throws Exception { } private void knockOutNode(String outcastName) throws InterruptedException { - CountDownLatch disappeared = new CountDownLatch(1); + CountDownLatch disappeared = new CountDownLatch(2); - testCluster.members.get(0).topologyService().addEventHandler(new TopologyEventHandler() { + TopologyEventHandler disappearListener = new TopologyEventHandler() { @Override public void onDisappeared(ClusterNode member) { if (Objects.equals(member.name(), outcastName)) { disappeared.countDown(); } } - }); + }; - testCluster.members.stream() + List<ClusterService> notOutcasts = testCluster.members.stream() .filter(service -> !outcastName.equals(service.nodeName())) - .forEach(service -> { - DefaultMessagingService messagingService = (DefaultMessagingService) service.messagingService(); - messagingService.dropMessages((recipientConsistentId, message) -> outcastName.equals(recipientConsistentId)); - }); + .collect(Collectors.toList()); + notOutcasts.forEach(clusterService -> { + clusterService.topologyService().addEventHandler(disappearListener); + }); + + notOutcasts.forEach(service -> { + DefaultMessagingService messagingService = (DefaultMessagingService) service.messagingService(); + messagingService.dropMessages((recipientConsistentId, message) -> outcastName.equals(recipientConsistentId)); + }); + + // Wait until all nodes see disappearance of the outcast. assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not disappear in time"); + + DefaultMessagingService messagingService = (DefaultMessagingService) testCluster.members.stream() + .filter(service -> outcastName.equals(service.nodeName())) + .findFirst() + .get().messagingService(); + + ConnectionManager cm = messagingService.connectionManager(); + + // Forcefully close channels, so that on reanimation nodes will create new channels. + cm.channels().forEach((stringConnectorKey, nettySender) -> { + nettySender.close().awaitUninterruptibly(); Review Comment: I wonder should we do this (close channels) in the production code when we understand that the node has left (or is going to leave)? Otherwise, it looks suspicious: maybe it's a bug? ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java: ########## @@ -148,24 +170,51 @@ public void onMessage(NetworkMessage message) { if (message instanceof HandshakeStartResponseMessage) { HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message; - if (staleIdDetector.isIdStale(msg.launchId().toString())) { + UUID remoteLaunchId = msg.launchId(); + String remoteConsistentId = msg.consistentId(); + long remoteReceivedCount = msg.receivedCount(); + short remoteChannelId = msg.connectionId(); + + if (staleIdDetector.isIdStale(remoteLaunchId.toString())) { handleStaleClientId(msg); return; } - this.remoteLaunchId = msg.launchId(); - this.remoteConsistentId = msg.consistentId(); - this.receivedCount = msg.receivedCount(); - this.remoteChannelId = msg.connectionId(); + this.remoteLaunchId = remoteLaunchId; + this.remoteConsistentId = remoteConsistentId; + this.receivedCount = remoteReceivedCount; + this.remoteChannelId = remoteChannelId; + + RecoveryDescriptor descriptor = recoveryDescriptorProvider.getRecoveryDescriptor( + this.remoteConsistentId, + this.remoteLaunchId, + this.remoteChannelId + ); + + while (!descriptor.acquire(ctx)) { + if (launchId.compareTo(remoteLaunchId) <= 0) { Review Comment: Please add a comment or clarify this by extracting a method ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -295,6 +299,99 @@ public void testStopTwice() throws Exception { server.close(); } + /** + * Tests that if two nodes are opening channels to each other, only one channel survives. + * + * @throws Exception If failed. + */ + @RepeatedTest(100) + public void testOneChannelLeftIfConnectToEachOther() throws Exception { + try ( + ConnectionManagerWrapper manager1 = startManager(4000); + ConnectionManagerWrapper manager2 = startManager(4001) + ) { + CompletableFuture<NettySender> fut1 = manager1.openChannelTo(manager2).toCompletableFuture(); + CompletableFuture<NettySender> fut2 = manager2.openChannelTo(manager1).toCompletableFuture(); + + NettySender sender1 = null; + NettySender sender2 = null; + + try { + sender1 = fut1.join(); + } catch (Exception ignored) { + // No-op. Review Comment: Is it correct to suggest that for exactly one `.join()` a `ChannelAlreadyExistsException` will be thrown? If yes, let's assert that the exception is what we expect it to be ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java: ########## @@ -136,14 +158,35 @@ public void onMessage(NetworkMessage message) { this.remoteLaunchId = msg.launchId(); this.remoteConsistentId = msg.consistentId(); - this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor( + RecoveryDescriptor descriptor = recoveryDescriptorProvider.getRecoveryDescriptor( remoteConsistentId, remoteLaunchId, - connectionId, - false + connectionId ); - handshake(recoveryDescriptor); + while (!descriptor.acquire(ctx)) { + if (launchId.compareTo(remoteLaunchId) > 0) { Review Comment: Could you please add a comment about this comparison (or extract it to a method with a clarifying name)? ########## modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.handshake; + +/** + * Exception that notifies of existence of a channel with a specific consistent id during handshake. + */ +public class ChannelAlreadyExistsException extends RuntimeException { + private static final long serialVersionUID = 0L; + + private final String consistentId; + + public ChannelAlreadyExistsException(String consistentId) { + this.consistentId = consistentId; + } + + public String consistentId() { Review Comment: Let's add a javadoc explaining what consistentId is returned here ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java: ########## @@ -124,4 +128,45 @@ public long onReceive() { public String toString() { return S.toString(RecoveryDescriptor.class, this); } + + private final AtomicReference<Channel> ctxHolder = new AtomicReference<>(); Review Comment: Why is it called `ctxHolder` if it holds a Channel? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org