vldpyatkov commented on code in PR #5221:
URL: https://github.com/apache/ignite-3/pull/5221#discussion_r1987302395


##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java:
##########
@@ -14,24 +14,158 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 
 /**
  * Raft nodes manager.
  */
-public class NodeManager {
+public class NodeManager implements Lifecycle<NodeOptions> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(NodeManager.class);
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
     private final ConcurrentMap<NodeId, Node> nodeMap = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<String, List<Node>> groupMap = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<PeerId, Queue<Object[]>> coalesced = new 
ConcurrentHashMap<>();
+
+    /** Node options. */
+    private NodeOptions options;
+    /** Task scheduler. */
+    private Scheduler scheduler;
+    /** Rpc client. */
+    private final RpcClient rpcClient;
+    /** Message factory. */
+    private RaftMessagesFactory messagesFactory;
+    /** Predicate to block a heartbeat messages. */
+    private BiPredicate<Message, PeerId> blockPred;
+
+    public NodeManager(ClusterService service) {
+        rpcClient = new IgniteRpcClient(service);
+    }
+
+    @Override
+    public boolean init(NodeOptions opts) {
+        options = opts;
+        scheduler = opts.getScheduler();
+        messagesFactory = opts.getRaftMessagesFactory();
+
+        scheduler.schedule(this::onSentHeartbeat , 
opts.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);

Review Comment:
   Probably we can create another ticket for this? Or you insist to do this 
improvement in this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to