rpuch commented on code in PR #4801:
URL: https://github.com/apache/ignite-3/pull/4801#discussion_r1863429697


##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferChannelTypeModule.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import com.google.auto.service.AutoService;
+import org.apache.ignite.internal.network.ChannelTypeModule;
+import org.apache.ignite.internal.network.ChannelTypeRegisterer;
+
+/** {@link ChannelTypeModule} for code-deployment module. */

Review Comment:
   ```suggestion
   /** {@link ChannelTypeModule} for file-transfer module. */
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.StripedExecutor;
+import org.apache.ignite.internal.worker.CriticalWorker;
+import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+
+/**
+ * Collection of {@link StripedExecutor executors} for the network based on 
{@link ChannelType#id()}.
+ *
+ * <p>Executors are created once in the constructor, so it is important that 
all {@link ChannelType} are registered at the time the
+ * constructor is called. This was done intentionally to optimize and get rid 
of contention.</p>
+ */
+class CriticalStripedExecutors implements ManuallyCloseable {
+    private final CriticalWorkerRegistry workerRegistry;
+
+    private final StripedExecutorByChannelTypeId executorByChannelTypeId;
+
+    private final List<CriticalWorker> registeredWorkers = new 
CopyOnWriteArrayList<>();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    CriticalStripedExecutors(
+            String nodeName,
+            String poolNamePrefix,
+            CriticalWorkerRegistry workerRegistry,
+            ChannelTypeRegistry channelTypeRegistry,
+            IgniteLogger log
+    ) {
+        this.workerRegistry = workerRegistry;
+
+        var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, 
poolNamePrefix, log, workerRegistry, registeredWorkers);
+
+        executorByChannelTypeId = 
StripedExecutorByChannelTypeId.of(channelTypeRegistry, factory);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        registeredWorkers.forEach(workerRegistry::unregister);
+
+        closeAll(
+                executorByChannelTypeId.stream()
+                        .parallel()
+                        .map(executor -> () -> 
shutdownAndAwaitTermination(executor, 10, SECONDS))
+        );
+    }
+
+    /**
+     * Returns executor to executes a command on a stripe with the given 
index.Ø

Review Comment:
   ```suggestion
        * Returns executor to execute a command on a stripe with the given 
index.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.StripedExecutor;
+import org.apache.ignite.internal.worker.CriticalWorker;
+import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+
+/**
+ * Collection of {@link StripedExecutor executors} for the network based on 
{@link ChannelType#id()}.
+ *
+ * <p>Executors are created once in the constructor, so it is important that 
all {@link ChannelType} are registered at the time the

Review Comment:
   ```suggestion
    * <p>Executors are created once in the constructor, so it is important that 
all {@link ChannelType}s are registered at the time the
   ```



##########
modules/network-api/src/main/java/org/apache/ignite/internal/network/ChannelTypeModule.java:
##########
@@ -17,17 +17,12 @@
 
 package org.apache.ignite.internal.network;
 
+import java.util.ServiceLoader;
+
 /**
- * Throws when register channel with already used identifier.
+ * Registerer of {@link ChannelType} for each module that should be loaded 
into the via the {@link ServiceLoader} and then received

Review Comment:
   ```suggestion
    * Registrar of {@link ChannelType} for each module that should be loaded 
into the via the {@link ServiceLoader} and then received
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/ArrayStripedExecutorByChannelTypeId.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.thread.StripedExecutor;
+
+/** Map-based implementation for increased performance. */

Review Comment:
   ```suggestion
   /** Array-based implementation for increased performance. */
   ```



##########
modules/network-api/src/main/java/org/apache/ignite/internal/network/ChannelTypeRegisterer.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** {@link ChannelType} registerer. */
+public class ChannelTypeRegisterer {

Review Comment:
   ```suggestion
   /** {@link ChannelType} registrar. */
   public class ChannelTypeRegistrar {
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java:
##########
@@ -394,7 +395,8 @@ private PartialNode startPartialNode(
                 new VaultStaleIds(vault),
                 clusterIdService,
                 workerRegistry,
-                failureProcessor
+                failureProcessor,
+                defaultChannelTypeRegistry()

Review Comment:
   How about creating an overload of `createClusterService()` with the old set 
of parameters that would just do `defaultChannelTypeRegistry()` itself, to 
avoid changing all these tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to