ibessonov commented on code in PR #7500:
URL: https://github.com/apache/ignite-3/pull/7500#discussion_r2753256065


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -360,16 +337,25 @@ private CommandResult handleUpdateAllCommand(
 
         UUID txId = cmd.txId();
 
+        assert storageLeaseInfo != null;
+        assert localNodeId != null;
+
+        CompatibilityValidationResult failedCompatValidationResult = null;
+
         if (shouldUpdateStorage(cmd.full(), storageLeaseInfo)) {
-            storageUpdateHandler.handleUpdateAll(
-                    txId,
-                    cmd.rowsToUpdate(),
-                    cmd.commitPartitionId().asReplicationGroupId(),
-                    !cmd.full(),
-                    () -> storage.lastApplied(commandIndex, commandTerm),
-                    cmd.full() ? safeTimestamp : null,
-                    indexIdsAtRwTxBeginTs(catalogService, txId, 
storage.tableId())
-            );
+            HybridTimestamp commitTsOrNull = cmd.full() ? safeTimestamp : null;
+
+            failedCompatValidationResult = 
validateSchemaCompatibilityIfNeeded(cmd.txId(), cmd.full(), commitTsOrNull);
+
+            if (failedCompatValidationResult == null) {
+                storageUpdater.updateStorage(cmd, commandIndex, commandTerm, 
commitTsOrNull, safeTimestamp);
+            } else {
+                assert !failedCompatValidationResult.isSuccessful();

Review Comment:
   Assuming that `isSuccessful` is expected to be tautologically `true` if an 
instance itself is not `null`, why do we have such a method in the first place?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatibilityValidationResult.java:
##########
@@ -17,13 +17,25 @@
 
 package org.apache.ignite.internal.partition.replicator.schemacompat;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.io.Serializable;
+import java.util.function.Function;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Result of a schema compatibility validation.
  */
-public class CompatValidationResult {
-    private static final CompatValidationResult SUCCESSFUL_RESULT = new 
CompatValidationResult(ValidationStatus.SUCCESS, "", -1, -1, null);
+public class CompatibilityValidationResult implements Serializable {

Review Comment:
   Could you please comment on why this class is Serializable now?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -104,15 +113,23 @@ public class TablePartitionProcessor implements 
RaftTableProcessor {
      */
     private final ZonePartitionId realReplicationGroupId;
 
+    private final SchemaCompatibilityValidator schemaCompatibilityValidator;
+
     private ReplicaMeta lastKnownLease;
 
+    private final StorageUpdater<UpdateCommand> singleUpdateStorageUpdater = 
new SingleUpdateStorageUpdater();
+
+    private final StorageUpdater<UpdateAllCommand> batchUpdateStorageUpdater = 
new BatchUpdateStorageUpdater();

Review Comment:
   Have you considered moving the update[all] logic into its own 
`AbstractCommandHandler` instead of leaving it here with an additional layer of 
abstraction (`StorageUpdater`)?
   
   From my perspective, your improvement further increases complexity by 
keeping all these things internal to the "partition processor" class 
(especially assuming that a collection of handlers will probably become 
extensible in the future).
   It doesn't have to be this way. We may discuss what can be done about it.



##########
modules/table/src/main/java/org/apache/ignite/internal/tx/UpdateCommandResult.java:
##########
@@ -19,6 +19,7 @@
 
 import java.io.Serializable;
 import java.util.Objects;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatibilityValidationResult;

Review Comment:
   Let's consult TX guys about the change of a module. It was probably 
intentional, I'm afraid we may break some implicit design here



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatibilityValidationResult.java:
##########
@@ -154,4 +166,32 @@ public String details() {
 
         return details;
     }
+
+    /**
+     * Throws an exception produced by the given factory if the validation 
failed.
+     *
+     * @param exceptionFactory Exception factory.
+     */
+    public <X extends Exception> void 
throwIfSchemaValidationOnCommitFailed(Function<String, ? extends X> 
exceptionFactory) throws X {
+        CompatibilityValidationResult validationResult = this;

Review Comment:
   Is this necessary? It only adds noise to the code, in my opinion. This is 
probably an artifact of a refactoring, right?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -378,12 +364,43 @@ private CommandResult handleUpdateAllCommand(
             advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
         }
 
-        replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : 
null, cmd.full());
+        replicaTouch(txId, cmd.txCoordinatorId(), cmd.full());
 
-        return new CommandResult(
-                new UpdateCommandResult(true, 
isPrimaryInGroupTopology(storageLeaseInfo), safeTimestamp.longValue()),
-                true
+        UpdateCommandResult result = new UpdateCommandResult(
+                true,
+                isPrimaryInGroupTopology(storageLeaseInfo),
+                safeTimestamp.longValue(),
+                failedCompatValidationResult
         );
+
+        return new CommandResult(result, true);
+    }
+
+    private @Nullable CompatibilityValidationResult 
validateSchemaCompatibilityIfNeeded(
+            UUID txId,
+            boolean full,
+            @Nullable HybridTimestamp commitTsOrNull
+    ) {
+        if (!full) {
+            return null;
+        }
+
+        HybridTimestamp commitTs = Objects.requireNonNull(commitTsOrNull);
+
+        CompatibilityValidationResult validationResult;
+        try {
+            validationResult = schemaCompatibilityValidator
+                    .validateCommit(txId, Set.of(storage.tableId()), commitTs)
+                    .get();

Review Comment:
   is it possible to have a non-completed future here? If so, it would be good 
to write a comment that explains the situation.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/TablePartitionProcessor.java:
##########
@@ -378,12 +364,43 @@ private CommandResult handleUpdateAllCommand(
             advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
         }
 
-        replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? safeTimestamp : 
null, cmd.full());
+        replicaTouch(txId, cmd.txCoordinatorId(), cmd.full());
 
-        return new CommandResult(
-                new UpdateCommandResult(true, 
isPrimaryInGroupTopology(storageLeaseInfo), safeTimestamp.longValue()),
-                true
+        UpdateCommandResult result = new UpdateCommandResult(
+                true,
+                isPrimaryInGroupTopology(storageLeaseInfo),
+                safeTimestamp.longValue(),
+                failedCompatValidationResult
         );
+
+        return new CommandResult(result, true);
+    }
+
+    private @Nullable CompatibilityValidationResult 
validateSchemaCompatibilityIfNeeded(
+            UUID txId,
+            boolean full,
+            @Nullable HybridTimestamp commitTsOrNull
+    ) {
+        if (!full) {
+            return null;
+        }
+
+        HybridTimestamp commitTs = Objects.requireNonNull(commitTsOrNull);
+
+        CompatibilityValidationResult validationResult;
+        try {
+            validationResult = schemaCompatibilityValidator
+                    .validateCommit(txId, Set.of(storage.tableId()), commitTs)
+                    .get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+
+        return validationResult.isSuccessful() ? null : validationResult;

Review Comment:
   Ok, this makes one of my comments obsolete. Could you please explain this 
pattern? Why do we use `null` if we already have a dedicated flag?



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java:
##########
@@ -392,18 +417,82 @@ void 
updatesLastAppliedForCommandsUpdatingLastAppliedIndex(WriteCommand command)
         verify(mvPartitionStorage).lastApplied(3, 2);
     }
 
+    @ParameterizedTest
+    @MethodSource("fullUpdateCommands")
+    void 
returnsFailedValidationResultIfSchemaChangeValidationOn1PcCommitFails(WriteCommand
 command) {
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+                .thenReturn(incompatibleSchemaChange());
+
+        CommandResult commandResult = commandListener.processCommand(command, 
3, 2, hybridClock.now());
+
+        assertThat(commandResult.wasApplied(), is(true));
+        UpdateCommandResult updateCommandResult = (UpdateCommandResult) 
commandResult.result();
+        assertThat(updateCommandResult, is(notNullValue()));
+        assertThat(updateCommandResult, 
is(instanceOf(UpdateCommandResult.class)));
+        CompatibilityValidationResult compatibilityValidationResult = 
updateCommandResult.compatibilityValidationResult();
+        assertThat(compatibilityValidationResult, is(notNullValue()));
+        assertThat(compatibilityValidationResult.isSuccessful(), is(false));
+    }
+
+    @ParameterizedTest
+    @MethodSource("fullUpdateCommands")
+    void 
doesNotUpdateStorageIfSchemaChangeValidationOn1PcCommitFails(WriteCommand 
command) {
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+                .thenReturn(incompatibleSchemaChange());
+
+        commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+        verify(mvPartitionStorage, never()).addWriteCommitted(any(), any(), 
any());
+    }
+
+    @ParameterizedTest
+    @MethodSource("fullUpdateCommands")
+    void 
updatesLastAppliedIndexIfSchemaChangeValidationOn1PcCommitFails(WriteCommand 
command) {
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(HybridTimestamp.class), any(HybridTimestamp.class)))
+                .thenReturn(incompatibleSchemaChange());
+
+        commandListener.processCommand(command, 3, 2, hybridClock.now());
+
+        verify(mvPartitionStorage).lastApplied(3, 2);
+    }
+
+    private static Stream<Arguments> fullUpdateCommands() {
+        return Stream.of(updateCommand(true), updateAllCommand(true))
+                .map(Arguments::of);
+    }
+
+    private static List<FullTableSchema> incompatibleSchemaChange() {
+        CatalogTableColumnDescriptor column1 = new 
CatalogTableColumnDescriptor("column1", ColumnType.INT32, false, 0, 0, 0, null);
+        CatalogTableColumnDescriptor column2 = new 
CatalogTableColumnDescriptor("column2", ColumnType.INT32, false, 0, 0, 0, null);
+
+        // Dropping a column is a forward-incompatible schema change.

Review Comment:
   Just a question, I got a little confused here. Why are we concerned with 
forward-incompatible schema changes here? They should not affect anything, or 
are they?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to