maedhroz commented on code in PR #4740:
URL: https://github.com/apache/cassandra/pull/4740#discussion_r3121795951
##########
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java:
##########
@@ -209,12 +226,193 @@ public void repairPartition(DecoratedKey dk,
Map<Replica, Mutation> mutations, R
// then we take the non-transactional path and the mutations are
intercepted in ReadCoordinator.sendRepairMutation
// which will ensure the repair mutation runs in the command store
thread after preceding transactions are done
ClusterMetadata cm = ClusterMetadata.current();
- if (coordinator.isEventuallyConsistent() &&
ConsensusMigrationMutationHelper.tokenShouldBeWrittenThroughAccord(cm,
command.metadata().id, dk.getToken(),
TransactionalMode::readRepairsThroughAccord,
TransactionalMigrationFromMode::readRepairsThroughAccord))
+ if (MigrationRouter.shouldUseTrackedForWrites(cm,
command.metadata().keyspace, command.metadata().id, dk.getToken()))
+ repairViaTrackedWrite(dk, mutations, writePlan);
+ else if (coordinator.isEventuallyConsistent() &&
ConsensusMigrationMutationHelper.tokenShouldBeWrittenThroughAccord(cm,
command.metadata().id, dk.getToken(),
TransactionalMode::readRepairsThroughAccord,
TransactionalMigrationFromMode::readRepairsThroughAccord))
repairViaAccordTransaction(dk, mutations, writePlan);
else
repairViaReadCoordinator(dk, mutations, writePlan, rrSource);
}
+ /*
+ * Send each per-replica mutation as a separate tracked write via
TrackedWriteRequest.perform().
+ * We do NOT merge mutations because merging can create oversized
mutations that make partitions unreadable.
+ * Each tracked write gets a proper MutationId and is recorded in the
mutation journal.
+ *
+ * If a tracked write fails with RetryOnDifferentSystemException or
CoordinatorBehindException (migration raced),
+ * we refetch ClusterMetadata and retry through the correct path (tracked
or untracked).
+ */
+ private void repairViaTrackedWrite(DecoratedKey dk, Map<Replica, Mutation>
mutations, ForWrite writePlan)
+ {
+ ReadRepairMetrics.repairedBlockingViaTrackedWrite.mark();
+ ConsistencyLevel cl = writePlan.consistencyLevel();
+
+ List<AsyncPromise<Void>> promises = new ArrayList<>(mutations.size());
+
+ for (Map.Entry<Replica, Mutation> entry : mutations.entrySet())
+ {
+ Replica replica = entry.getKey();
+ Mutation mutation = entry.getValue();
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ promises.add(promise);
+ startTrackedWriteAttempt(dk, replica, mutation, cl, promise, true);
+ }
+
+ // Compute blockFor using the same logic as BlockingPartitionRepair:
writeQuorum adjusted
+ // for contacts that don't have mutations to send
+ int adjustedBlockFor = writePlan.writeQuorum();
+ for (Replica participant : writePlan.contacts())
+ {
+ if (!mutations.containsKey(participant))
+ adjustedBlockFor--;
+ }
+ final int blockFor = adjustedBlockFor;
+
+ repairs.add(new PendingPartitionRepair()
+ {
+ @Override
+ public boolean awaitRepairs(long remaining, TimeUnit timeUnit)
throws InterruptedException
+ {
+ long deadlineNanos = nanoTime() + timeUnit.toNanos(remaining);
+ Throwable error = null;
+ for (AsyncPromise<Void> promise : promises)
+ {
+ long remainingNanos = deadlineNanos - nanoTime();
+ if (remainingNanos <= 0)
+ return false;
+ try
+ {
+ promise.get(remainingNanos, NANOSECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ return false;
+ }
+ catch (ExecutionException e)
+ {
+ error = Throwables.merge(error, e.getCause());
+ }
+ }
+ Throwables.maybeFail(error);
+ return true;
+ }
+
+ @Override
+ public ForWrite repairPlan()
+ {
+ return writePlan;
+ }
+
+ @Override
+ public int blockFor()
+ {
+ return blockFor;
+ }
+
+ @Override
+ public int waitingOn()
+ {
+ int count = 0;
+ for (AsyncPromise<Void> promise : promises)
Review Comment:
nit: Would iterating the full list of promises on ever call here ever become
a problem?
##########
src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java:
##########
@@ -209,12 +226,193 @@ public void repairPartition(DecoratedKey dk,
Map<Replica, Mutation> mutations, R
// then we take the non-transactional path and the mutations are
intercepted in ReadCoordinator.sendRepairMutation
// which will ensure the repair mutation runs in the command store
thread after preceding transactions are done
ClusterMetadata cm = ClusterMetadata.current();
- if (coordinator.isEventuallyConsistent() &&
ConsensusMigrationMutationHelper.tokenShouldBeWrittenThroughAccord(cm,
command.metadata().id, dk.getToken(),
TransactionalMode::readRepairsThroughAccord,
TransactionalMigrationFromMode::readRepairsThroughAccord))
+ if (MigrationRouter.shouldUseTrackedForWrites(cm,
command.metadata().keyspace, command.metadata().id, dk.getToken()))
+ repairViaTrackedWrite(dk, mutations, writePlan);
+ else if (coordinator.isEventuallyConsistent() &&
ConsensusMigrationMutationHelper.tokenShouldBeWrittenThroughAccord(cm,
command.metadata().id, dk.getToken(),
TransactionalMode::readRepairsThroughAccord,
TransactionalMigrationFromMode::readRepairsThroughAccord))
repairViaAccordTransaction(dk, mutations, writePlan);
else
repairViaReadCoordinator(dk, mutations, writePlan, rrSource);
}
+ /*
+ * Send each per-replica mutation as a separate tracked write via
TrackedWriteRequest.perform().
+ * We do NOT merge mutations because merging can create oversized
mutations that make partitions unreadable.
+ * Each tracked write gets a proper MutationId and is recorded in the
mutation journal.
+ *
+ * If a tracked write fails with RetryOnDifferentSystemException or
CoordinatorBehindException (migration raced),
+ * we refetch ClusterMetadata and retry through the correct path (tracked
or untracked).
+ */
+ private void repairViaTrackedWrite(DecoratedKey dk, Map<Replica, Mutation>
mutations, ForWrite writePlan)
+ {
+ ReadRepairMetrics.repairedBlockingViaTrackedWrite.mark();
+ ConsistencyLevel cl = writePlan.consistencyLevel();
+
+ List<AsyncPromise<Void>> promises = new ArrayList<>(mutations.size());
+
+ for (Map.Entry<Replica, Mutation> entry : mutations.entrySet())
+ {
+ Replica replica = entry.getKey();
+ Mutation mutation = entry.getValue();
+ AsyncPromise<Void> promise = new AsyncPromise<>();
+ promises.add(promise);
+ startTrackedWriteAttempt(dk, replica, mutation, cl, promise, true);
+ }
+
+ // Compute blockFor using the same logic as BlockingPartitionRepair:
writeQuorum adjusted
+ // for contacts that don't have mutations to send
+ int adjustedBlockFor = writePlan.writeQuorum();
+ for (Replica participant : writePlan.contacts())
+ {
+ if (!mutations.containsKey(participant))
+ adjustedBlockFor--;
+ }
+ final int blockFor = adjustedBlockFor;
+
+ repairs.add(new PendingPartitionRepair()
+ {
+ @Override
+ public boolean awaitRepairs(long remaining, TimeUnit timeUnit)
throws InterruptedException
+ {
+ long deadlineNanos = nanoTime() + timeUnit.toNanos(remaining);
+ Throwable error = null;
+ for (AsyncPromise<Void> promise : promises)
+ {
+ long remainingNanos = deadlineNanos - nanoTime();
+ if (remainingNanos <= 0)
+ return false;
+ try
+ {
+ promise.get(remainingNanos, NANOSECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ return false;
+ }
+ catch (ExecutionException e)
+ {
+ error = Throwables.merge(error, e.getCause());
+ }
+ }
+ Throwables.maybeFail(error);
+ return true;
+ }
+
+ @Override
+ public ForWrite repairPlan()
+ {
+ return writePlan;
+ }
+
+ @Override
+ public int blockFor()
+ {
+ return blockFor;
+ }
+
+ @Override
+ public int waitingOn()
+ {
+ int count = 0;
+ for (AsyncPromise<Void> promise : promises)
Review Comment:
nit: Would iterating the full list of promises on every call here ever
become a problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]