ibessonov commented on code in PR #7500:
URL: https://github.com/apache/ignite-3/pull/7500#discussion_r2753256065
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -360,16 +337,25 @@ private CommandResult handleUpdateAllCommand(
UUID txId = cmd.txId();
+ assert storageLeaseInfo != null;
+ assert localNodeId != null;
+
+ CompatibilityValidationResult failedCompatValidationResult = null;
+
if (shouldUpdateStorage(cmd.full(), storageLeaseInfo)) {
- storageUpdateHandler.handleUpdateAll(
- txId,
- cmd.rowsToUpdate(),
- cmd.commitPartitionId().asReplicationGroupId(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? safeTimestamp : null,
- indexIdsAtRwTxBeginTs(catalogService, txId,
storage.tableId())
- );
+ HybridTimestamp commitTsOrNull = cmd.full() ? safeTimestamp : null;
+
+ failedCompatValidationResult =
validateSchemaCompatibilityIfNeeded(cmd.txId(), cmd.full(), commitTsOrNull);
+
+ if (failedCompatValidationResult == null) {
+ storageUpdater.updateStorage(cmd, commandIndex, commandTerm,
commitTsOrNull, safeTimestamp);
+ } else {
+ assert !failedCompatValidationResult.isSuccessful();
Review Comment:
Assuming that `isSuccessful` is expected to be tautologically `true` if an
instance itself is not `null`, why do we have such a method in the first place?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatibilityValidationResult.java:
##########
@@ -17,13 +17,25 @@
package org.apache.ignite.internal.partition.replicator.schemacompat;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.io.Serializable;
+import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
/**
* Result of a schema compatibility validation.
*/
-public class CompatValidationResult {
- private static final CompatValidationResult SUCCESSFUL_RESULT = new
CompatValidationResult(ValidationStatus.SUCCESS, "", -1, -1, null);
+public class CompatibilityValidationResult implements Serializable {
Review Comment:
Could you please comment on why this class is Serializable now?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -104,15 +113,23 @@ public class TablePartitionProcessor implements
RaftTableProcessor {
*/
private final ZonePartitionId realReplicationGroupId;
+ private final SchemaCompatibilityValidator schemaCompatibilityValidator;
+
private ReplicaMeta lastKnownLease;
+ private final StorageUpdater<UpdateCommand> singleUpdateStorageUpdater =
new SingleUpdateStorageUpdater();
+
+ private final StorageUpdater<UpdateAllCommand> batchUpdateStorageUpdater =
new BatchUpdateStorageUpdater();
Review Comment:
Have you considered moving the update[all] logic into its own
`AbstractCommandHandler` instead of leaving it here with an additional layer of
abstraction (`StorageUpdater`)?
From my perspective, your improvement further increases complexity by
keeping all these things internal to the "partition processor" class
(especially assuming that a collection of handlers will probably become
extensible in the future).
It doesn't have to be this way. We may discuss what can be done about it.
##########
modules/table/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -19,6 +19,7 @@
import java.io.Serializable;
import java.util.Objects;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatibilityValidationResult;
Review Comment:
Let's consult TX guys about the change of a module. It was probably
intentional, I'm afraid we may break some implicit design here
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatibilityValidationResult.java:
##########
@@ -154,4 +166,32 @@ public String details() {
return details;
}
+
+ /**
+ * Throws an exception produced by the given factory if the validation
failed.
+ *
+ * @param exceptionFactory Exception factory.
+ */
+ public <X extends Exception> void
throwIfSchemaValidationOnCommitFailed(Function<String, ? extends X>
exceptionFactory) throws X {
+ CompatibilityValidationResult validationResult = this;
Review Comment:
Is this necessary? It only adds noise to the code, in my opinion. This is
probably an artifact of a refactoring, right?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -378,12 +364,43 @@ private CommandResult handleUpdateAllCommand(
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
- replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp :
null, cmd.full());
+ replicaTouch(txId, cmd.txCoordinatorId(), cmd.full());
- return new CommandResult(
- new UpdateCommandResult(true,
isPrimaryInGroupTopology(storageLeaseInfo), safeTimestamp.longValue()),
- true
+ UpdateCommandResult result = new UpdateCommandResult(
+ true,
+ isPrimaryInGroupTopology(storageLeaseInfo),
+ safeTimestamp.longValue(),
+ failedCompatValidationResult
);
+
+ return new CommandResult(result, true);
+ }
+
+ private @Nullable CompatibilityValidationResult
validateSchemaCompatibilityIfNeeded(
+ UUID txId,
+ boolean full,
+ @Nullable HybridTimestamp commitTsOrNull
+ ) {
+ if (!full) {
+ return null;
+ }
+
+ HybridTimestamp commitTs = Objects.requireNonNull(commitTsOrNull);
+
+ CompatibilityValidationResult validationResult;
+ try {
+ validationResult = schemaCompatibilityValidator
+ .validateCommit(txId, Set.of(storage.tableId()), commitTs)
+ .get();
Review Comment:
is it possible to have a non-completed future here? If so, it would be good
to write a comment that explains the situation.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -378,12 +364,43 @@ private CommandResult handleUpdateAllCommand(
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
- replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp :
null, cmd.full());
+ replicaTouch(txId, cmd.txCoordinatorId(), cmd.full());
- return new CommandResult(
- new UpdateCommandResult(true,
isPrimaryInGroupTopology(storageLeaseInfo), safeTimestamp.longValue()),
- true
+ UpdateCommandResult result = new UpdateCommandResult(
+ true,
+ isPrimaryInGroupTopology(storageLeaseInfo),
+ safeTimestamp.longValue(),
+ failedCompatValidationResult
);
+
+ return new CommandResult(result, true);
+ }
+
+ private @Nullable CompatibilityValidationResult
validateSchemaCompatibilityIfNeeded(
+ UUID txId,
+ boolean full,
+ @Nullable HybridTimestamp commitTsOrNull
+ ) {
+ if (!full) {
+ return null;
+ }
+
+ HybridTimestamp commitTs = Objects.requireNonNull(commitTsOrNull);
+
+ CompatibilityValidationResult validationResult;
+ try {
+ validationResult = schemaCompatibilityValidator
+ .validateCommit(txId, Set.of(storage.tableId()), commitTs)
+ .get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ } catch (ExecutionException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+
+ return validationResult.isSuccessful() ? null : validationResult;
Review Comment:
Ok, this makes one of my comments obsolete. Could you please explain this
pattern? Why do we use `null` if we already have a dedicated flag?
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java:
##########
@@ -392,18 +417,82 @@ void
updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand command)
verify(mvPartitionStorage).lastApplied(3, 2);
}
+ @ParameterizedTest
+ @MethodSource("fullUpdateCommands")
+ void
returnsFailedValidationResultIfSchemaChangeValidationOn1PcCommitFails(WriteCommand
command) {
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+ .thenReturn(incompatibleSchemaChange());
+
+ CommandResult commandResult = commandListener.processCommand(command,
3, 2, hybridClock.now());
+
+ assertThat(commandResult.wasApplied(), is(true));
+ UpdateCommandResult updateCommandResult = (UpdateCommandResult)
commandResult.result();
+ assertThat(updateCommandResult, is(notNullValue()));
+ assertThat(updateCommandResult,
is(instanceOf(UpdateCommandResult.class)));
+ CompatibilityValidationResult compatibilityValidationResult =
updateCommandResult.compatibilityValidationResult();
+ assertThat(compatibilityValidationResult, is(notNullValue()));
+ assertThat(compatibilityValidationResult.isSuccessful(), is(false));
+ }
+
+ @ParameterizedTest
+ @MethodSource("fullUpdateCommands")
+ void
doesNotUpdateStorageIfSchemaChangeValidationOn1PcCommitFails(WriteCommand
command) {
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+ .thenReturn(incompatibleSchemaChange());
+
+ commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+ verify(mvPartitionStorage, never()).addWriteCommitted(any(), any(),
any());
+ }
+
+ @ParameterizedTest
+ @MethodSource("fullUpdateCommands")
+ void
updatesLastAppliedIndexIfSchemaChangeValidationOn1PcCommitFails(WriteCommand
command) {
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+ .thenReturn(incompatibleSchemaChange());
+
+ commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+ verify(mvPartitionStorage).lastApplied(3, 2);
+ }
+
+ private static Stream<Arguments> fullUpdateCommands() {
+ return Stream.of(updateCommand(true), updateAllCommand(true))
+ .map(Arguments::of);
+ }
+
+ private static List<FullTableSchema> incompatibleSchemaChange() {
+ CatalogTableColumnDescriptor column1 = new
CatalogTableColumnDescriptor("column1", ColumnType.INT32, false, 0, 0, 0, null);
+ CatalogTableColumnDescriptor column2 = new
CatalogTableColumnDescriptor("column2", ColumnType.INT32, false, 0, 0, 0, null);
+
+ // Dropping a column is a forward-incompatible schema change.
Review Comment:
Just a question, I got a little confused here. Why are we concerned with
forward-incompatible schema changes here? They should not affect anything, or
are they?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]