rpuch commented on code in PR #4801: URL: https://github.com/apache/ignite-3/pull/4801#discussion_r1863429697
########## modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferChannelTypeModule.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.file; + +import com.google.auto.service.AutoService; +import org.apache.ignite.internal.network.ChannelTypeModule; +import org.apache.ignite.internal.network.ChannelTypeRegisterer; + +/** {@link ChannelTypeModule} for code-deployment module. */ Review Comment: ```suggestion /** {@link ChannelTypeModule} for file-transfer module. */ ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.StripedExecutor; +import org.apache.ignite.internal.worker.CriticalWorker; +import org.apache.ignite.internal.worker.CriticalWorkerRegistry; + +/** + * Collection of {@link StripedExecutor executors} for the network based on {@link ChannelType#id()}. + * + * <p>Executors are created once in the constructor, so it is important that all {@link ChannelType} are registered at the time the + * constructor is called. This was done intentionally to optimize and get rid of contention.</p> + */ +class CriticalStripedExecutors implements ManuallyCloseable { + private final CriticalWorkerRegistry workerRegistry; + + private final StripedExecutorByChannelTypeId executorByChannelTypeId; + + private final List<CriticalWorker> registeredWorkers = new CopyOnWriteArrayList<>(); + + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + CriticalStripedExecutors( + String nodeName, + String poolNamePrefix, + CriticalWorkerRegistry workerRegistry, + ChannelTypeRegistry channelTypeRegistry, + IgniteLogger log + ) { + this.workerRegistry = workerRegistry; + + var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers); + + executorByChannelTypeId = StripedExecutorByChannelTypeId.of(channelTypeRegistry, factory); + } + + @Override + public void close() throws Exception { + if (!closeGuard.compareAndSet(false, true)) { + return; + } + + registeredWorkers.forEach(workerRegistry::unregister); + + closeAll( + executorByChannelTypeId.stream() + .parallel() + .map(executor -> () -> shutdownAndAwaitTermination(executor, 10, SECONDS)) + ); + } + + /** + * Returns executor to executes a command on a stripe with the given index.Ø Review Comment: ```suggestion * Returns executor to execute a command on a stripe with the given index. ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.thread.StripedExecutor; +import org.apache.ignite.internal.worker.CriticalWorker; +import org.apache.ignite.internal.worker.CriticalWorkerRegistry; + +/** + * Collection of {@link StripedExecutor executors} for the network based on {@link ChannelType#id()}. + * + * <p>Executors are created once in the constructor, so it is important that all {@link ChannelType} are registered at the time the Review Comment: ```suggestion * <p>Executors are created once in the constructor, so it is important that all {@link ChannelType}s are registered at the time the ``` ########## modules/network-api/src/main/java/org/apache/ignite/internal/network/ChannelTypeModule.java: ########## @@ -17,17 +17,12 @@ package org.apache.ignite.internal.network; +import java.util.ServiceLoader; + /** - * Throws when register channel with already used identifier. + * Registerer of {@link ChannelType} for each module that should be loaded into the via the {@link ServiceLoader} and then received Review Comment: ```suggestion * Registrar of {@link ChannelType} for each module that should be loaded into the via the {@link ServiceLoader} and then received ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/ArrayStripedExecutorByChannelTypeId.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import java.util.Arrays; +import java.util.stream.Stream; +import org.apache.ignite.internal.thread.StripedExecutor; + +/** Map-based implementation for increased performance. */ Review Comment: ```suggestion /** Array-based implementation for increased performance. */ ``` ########## modules/network-api/src/main/java/org/apache/ignite/internal/network/ChannelTypeRegisterer.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** {@link ChannelType} registerer. */ +public class ChannelTypeRegisterer { Review Comment: ```suggestion /** {@link ChannelType} registrar. */ public class ChannelTypeRegistrar { ``` ########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java: ########## @@ -394,7 +395,8 @@ private PartialNode startPartialNode( new VaultStaleIds(vault), clusterIdService, workerRegistry, - failureProcessor + failureProcessor, + defaultChannelTypeRegistry() Review Comment: How about creating an overload of `createClusterService()` with the old set of parameters that would just do `defaultChannelTypeRegistry()` itself, to avoid changing all these tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org