vldpyatkov commented on code in PR #5221: URL: https://github.com/apache/ignite-3/pull/5221#discussion_r1987302395
########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/NodeManager.java: ########## @@ -14,24 +14,158 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ignite.raft.jraft; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; import java.util.stream.Collectors; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.raft.jraft.core.Scheduler; import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.PeerId; +import org.apache.ignite.raft.jraft.option.NodeOptions; +import org.apache.ignite.raft.jraft.rpc.CoalescedHeartbeatRequestBuilder; +import org.apache.ignite.raft.jraft.rpc.Message; +import org.apache.ignite.raft.jraft.rpc.RpcClient; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; +import org.apache.ignite.raft.jraft.rpc.RpcRequests.CoalescedHeartbeatResponse; +import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient; import org.apache.ignite.raft.jraft.util.OnlyForTest; /** * Raft nodes manager. */ -public class NodeManager { +public class NodeManager implements Lifecycle<NodeOptions> { + private static final IgniteLogger LOG = Loggers.forClass(NodeManager.class); + + private final AtomicBoolean stopGuard = new AtomicBoolean(); + private final ConcurrentMap<NodeId, Node> nodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, List<Node>> groupMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<PeerId, Queue<Object[]>> coalesced = new ConcurrentHashMap<>(); + + /** Node options. */ + private NodeOptions options; + /** Task scheduler. */ + private Scheduler scheduler; + /** Rpc client. */ + private final RpcClient rpcClient; + /** Message factory. */ + private RaftMessagesFactory messagesFactory; + /** Predicate to block a heartbeat messages. */ + private BiPredicate<Message, PeerId> blockPred; + + public NodeManager(ClusterService service) { + rpcClient = new IgniteRpcClient(service); + } + + @Override + public boolean init(NodeOptions opts) { + options = opts; + scheduler = opts.getScheduler(); + messagesFactory = opts.getRaftMessagesFactory(); + + scheduler.schedule(this::onSentHeartbeat , opts.getElectionTimeoutMs(), TimeUnit.MILLISECONDS); Review Comment: Probably we can create another ticket for this? Or you insist to do this improvement in this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org