ascherbakoff commented on code in PR #5221:
URL: https://github.com/apache/ignite-3/pull/5221#discussion_r1983167842


##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java:
##########
@@ -88,19 +93,78 @@ public Future<Message> requestVote(final PeerId peerId, 
final RequestVoteRequest
 
     @Override
     public Future<Message> appendEntries(final PeerId peerId, final 
AppendEntriesRequest request,
-        final int timeoutMs, final RpcResponseClosure<AppendEntriesResponse> 
done) {
+            final int timeoutMs, final 
RpcResponseClosure<AppendEntriesResponse> done) {

Review Comment:
   Formatting issues here and below.



##########
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java:
##########
@@ -198,7 +198,7 @@ void executesWrongJobClassOnRemoteNodesAsync(String 
jobClassName, int errorCode,
                 JobTarget.anyNode(clusterNode(node(1)), clusterNode(node(2))),
                 JobDescriptor.builder(jobClassName).units(units()).build(),
                 null
-        ).get(1, TimeUnit.SECONDS));
+        ).get(10, TimeUnit.SECONDS));

Review Comment:
   Why is this required ?



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java:
##########
@@ -88,19 +93,78 @@ public Future<Message> requestVote(final PeerId peerId, 
final RequestVoteRequest
 
     @Override
     public Future<Message> appendEntries(final PeerId peerId, final 
AppendEntriesRequest request,
-        final int timeoutMs, final RpcResponseClosure<AppendEntriesResponse> 
done) {
+            final int timeoutMs, final 
RpcResponseClosure<AppendEntriesResponse> done) {
 
         // Assign an executor in round-robin fasion.
         final Executor executor = 
this.appendEntriesExecutorMap.computeIfAbsent(peerId,
-            k -> nodeOptions.getStripedExecutor().next());
+                k -> nodeOptions.getStripedExecutor().next());
 
         if (connect(peerId)) { // Replicator should be started asynchronously 
by node joined event.
+            if (isHeartbeatRequest(request)) {
+                return sendHeartbeat(peerId, request, timeoutMs, done, 
executor);
+            }
+
             return invokeWithDone(peerId, request, done, timeoutMs, executor);
         }
 
         return onConnectionFail(executor, request, done, peerId);
     }
 
+    /**
+     * Accumulates heartbeat messages to send them into the batch request.
+     *
+     * @param peerId Remote peer id.
+     * @param request Request.
+     * @param timeoutMs Timeout.
+     * @param done Done callback.
+     * @param executor Executor where the done callback is executed.
+     * @return A future with response.
+     */
+    private Future<Message> sendHeartbeat(
+            PeerId peerId,
+            AppendEntriesRequest request,
+            int timeoutMs,
+            RpcResponseClosure<AppendEntriesResponse> done,
+            Executor executor
+    ) {
+        NodeManager nodeManager = this.nodeOptions.getNodeManager();
+
+        return invokeWithDone(
+                peerId,
+                request,
+                null,
+                done,
+                timeoutMs,
+                executor,
+                (peerId1, request1, ctx, callback, timeoutMs1) ->
+                        nodeManager.enqueue(peerId, (Message) 
request1).whenComplete((res, err) -> {
+                            if (err instanceof ExecutionException) {
+                                err = new RemotingException(err);
+                            } else if (err instanceof TimeoutException) // 
Translate timeout exception.
+                            {
+                                err = new InvokeTimeoutException();
+                            }
+
+                            Throwable finalErr = err;
+
+                            // Avoid deadlocks if a closure has completed in 
the same thread.
+                            Utils.runInThread(callback.executor(), () -> 
callback.complete(res, finalErr));
+                        })
+        );
+    }
+
+    /**
+     * Determines whether it is a heartbeat request.
+     *
+     * @param request Append entries request.
+     * @return True if that request is heartbeat or false otherwise.
+     */
+    private static boolean isHeartbeatRequest(final AppendEntriesRequest 
request) {

Review Comment:
   This is not needed. See the comment above.



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java:
##########
@@ -178,20 +180,24 @@ public <T extends Message> CompletableFuture<Message> 
invokeWithDone(final PeerI
     public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
         final RpcResponseClosure<T> done, final int timeoutMs,
         final Executor rpcExecutor) {
-        return invokeWithDone(peerId, request, null, done, timeoutMs, 
rpcExecutor);
+        return invokeWithDone(peerId, request, null, done, timeoutMs, 
rpcExecutor, this.rpcClient);
     }
 
     public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
         final InvokeContext ctx,
         final RpcResponseClosure<T> done, final int timeoutMs) {
-        return invokeWithDone(peerId, request, ctx, done, timeoutMs, 
this.rpcExecutor);
+        return invokeWithDone(peerId, request, ctx, done, timeoutMs, 
this.rpcExecutor, this.rpcClient);
     }
 
-    public <T extends Message> CompletableFuture<Message> invokeWithDone(final 
PeerId peerId, final Message request,
-        final InvokeContext ctx,
-        final RpcResponseClosure<T> done, final int timeoutMs,
-        final Executor rpcExecutor) {
-        final RpcClient rc = this.rpcClient;
+    public <T extends Message> CompletableFuture<Message> invokeWithDone(
+            PeerId peerId,

Review Comment:
   Avoid reformatting raft code.



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/core/DefaultRaftClientService.java:
##########
@@ -88,19 +93,78 @@ public Future<Message> requestVote(final PeerId peerId, 
final RequestVoteRequest
 
     @Override
     public Future<Message> appendEntries(final PeerId peerId, final 
AppendEntriesRequest request,
-        final int timeoutMs, final RpcResponseClosure<AppendEntriesResponse> 
done) {
+            final int timeoutMs, final 
RpcResponseClosure<AppendEntriesResponse> done) {
 
         // Assign an executor in round-robin fasion.
         final Executor executor = 
this.appendEntriesExecutorMap.computeIfAbsent(peerId,
-            k -> nodeOptions.getStripedExecutor().next());
+                k -> nodeOptions.getStripedExecutor().next());
 
         if (connect(peerId)) { // Replicator should be started asynchronously 
by node joined event.
+            if (isHeartbeatRequest(request)) {

Review Comment:
   This check is already done in the upper level in
   org.apache.ignite.raft.jraft.core.Replicator#sendEmptyEntries(boolean, 
org.apache.ignite.raft.jraft.rpc.RpcResponseClosure<org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse>,
 boolean)
   Just call sendHeartbeat from this method.
   ```
              if (isHeartbeat) {
                   ....
                   this.heartbeatInFly = 
this.rpcService.sendHeartbeat(this.options.getPeerId().getEndpoint(), request, 
coalesce,
                           this.options.getElectionTimeoutMs() / 2, 
heartbeatDone);
               }
   ```



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java:
##########
@@ -14,24 +14,158 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 
 /**
  * Raft nodes manager.
  */
-public class NodeManager {
+public class NodeManager implements Lifecycle<NodeOptions> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(NodeManager.class);
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     private final ConcurrentMap<NodeId, Node> nodeMap = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<String, List<Node>> groupMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<PeerId, Queue<Object[]>> coalesced = new 
ConcurrentHashMap<>();
+
+    /** Node options. */
+    private NodeOptions options;
+    /** Task scheduler. */
+    private Scheduler scheduler;
+    /** Rpc client. */
+    private final RpcClient rpcClient;
+    /** Message factory. */
+    private RaftMessagesFactory messagesFactory;
+    /** Predicate to block a heartbeat messages. */
+    private BiPredicate<Message, PeerId> blockPred;
+
+    public NodeManager(ClusterService service) {
+        rpcClient = new IgniteRpcClient(service);
+    }
+
+    @Override
+    public boolean init(NodeOptions opts) {
+        options = opts;
+        scheduler = opts.getScheduler();
+        messagesFactory = opts.getRaftMessagesFactory();
+
+        scheduler.schedule(this::onSentHeartbeat , 
opts.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);

Review Comment:
   This scheme is not efficient.
   Currently each raft group generate it's own timer event in 
r.startHeartbeatTimer(Utils.nowMs());
   Which means events multiplication linear to groups number and excessive CPU 
usage.
   Instead, a single event per all groups can be triggered.



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/NetworkInvoker.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.rpc;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.error.RemotingException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Rpc invocation.
+ */
+public interface NetworkInvoker {
+    /**
+     * Asynchronous invocation with a callback.
+     *
+     * @param peerId target peer ID
+     * @param request request object
+     * @param ctx invoke context
+     * @param callback invoke callback
+     * @param timeoutMs timeout millisecond
+     *
+     * @return The future.
+     */
+    CompletableFuture<Message> invokeAsync(

Review Comment:
   I was not able to understand why this interface is introduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to