denis-chudov commented on code in PR #2488:
URL: https://github.com/apache/ignite-3/pull/2488#discussion_r1318227297
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2328,19 +2345,25 @@ private CompletableFuture<Boolean>
ensureReplicaIsPrimary(ReplicaRequest request
}
if (expectedTerm != null) {
- return raftClient.refreshAndGetLeaderWithTerm()
- .thenCompose(replicaAndTerm -> {
- long currentTerm = replicaAndTerm.term();
-
- if (expectedTerm == currentTerm) {
- return completedFuture(null);
+ return placementDriver.getPrimaryReplica(replicationGroupId,
hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW))
+ .thenCompose(primaryReplica -> {
+ long currentEnlistmentConsistencyToken =
primaryReplica.getStartTime().longValue();
+
+ if
(expectedTerm.equals(currentEnlistmentConsistencyToken)) {
+ if
(primaryReplica.getExpirationTime().before(hybridClock.now())) {
+ return failedFuture(
+ new
PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken));
+ } else {
+ return completedFuture(null);
+ }
} else {
- return failedFuture(new
PrimaryReplicaMissException(expectedTerm, currentTerm));
+ return failedFuture(new
PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken));
}
}
);
} else if (request instanceof ReadOnlyReplicaRequest || request
instanceof ReplicaSafeTimeSyncRequest) {
- return
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm ->
isLocalPeer(replicaAndTerm.leader()));
+ return placementDriver.getPrimaryReplica(replicationGroupId,
hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW))
Review Comment:
Seems that there are no usages of `getPrimaryReplica` without adding
`CLOCK_SKEW` to the argument, and this clock skew be always necessary.
Shouldn't we add this clock skew to time comparison inside of this method?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1553,19 +1559,20 @@ public void close() {
* @return Cluster node to evalute read-only request.
*/
protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
- RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> {
- if (e != null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
- } else {
- if (res == null || res.leader() == null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
- } else {
- return
clusterNodeResolver.apply(res.leader().consistentId());
- }
- }
- });
+ return placementDriver.awaitPrimaryReplica(tablePartitionId,
clock.now())
+ .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT,
TimeUnit.SECONDS).handle((res, e) -> {
+ if (e != null) {
+ throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
+ } else {
+ if (res == null || res.getLeaseholder() == null) {
Review Comment:
Do we need the check whether leaseholder is null for the primary replica
lease? 'null' leaseholder is possible only for empty lease. And in the same
time you removed `null` check from `enlist` method.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2328,19 +2345,25 @@ private CompletableFuture<Boolean>
ensureReplicaIsPrimary(ReplicaRequest request
}
if (expectedTerm != null) {
- return raftClient.refreshAndGetLeaderWithTerm()
- .thenCompose(replicaAndTerm -> {
- long currentTerm = replicaAndTerm.term();
-
- if (expectedTerm == currentTerm) {
- return completedFuture(null);
+ return placementDriver.getPrimaryReplica(replicationGroupId,
hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW))
+ .thenCompose(primaryReplica -> {
+ long currentEnlistmentConsistencyToken =
primaryReplica.getStartTime().longValue();
+
+ if
(expectedTerm.equals(currentEnlistmentConsistencyToken)) {
+ if
(primaryReplica.getExpirationTime().before(hybridClock.now())) {
+ return failedFuture(
+ new
PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken));
Review Comment:
are you going to add TODOs here to rename `term` variables?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]