twalthr commented on code in PR #27301:
URL: https://github.com/apache/flink/pull/27301#discussion_r2611295728


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java:
##########
@@ -65,6 +65,25 @@
         },
         minPlanVersion = FlinkVersion.v2_0,
         minStateVersion = FlinkVersion.v2_0)
+// Version 2: Fixed the data type used for creating constraint enforcer and 
sink upsert
+// materializer. Since this version the sink works correctly with persisted 
metadata columns.
+// We introduced a new version, because statements that were never rolling 
back to a value from
+// state could run succesfully. We allow those jobs to be upgraded. Without a 
new versions such jobs
+// would fail on restore, because the state serializer would differ
+@ExecNodeMetadata(

Review Comment:
   Batch version can stay at v1. There is no upgrade story that would interrupt 
a running batch job. I did it similarly for scan_v2, where I just updated v1 to 
the new behavior.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -574,4 +583,6 @@ private Optional<RowKind> getTargetRowKind() {
         }
         return Optional.empty();
     }
+
+    protected abstract boolean legacyPhysicalTypeEnabled();

Review Comment:
   I like the idea with a method in CommonExec. Can you update CommonExecScan 
like this as well for a reference implementation?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java:
##########
@@ -44,6 +46,20 @@ public List<TableTestProgram> programs() {
                 TableSinkTestPrograms.SINK_WRITING_METADATA,
                 TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
                 TableSinkTestPrograms.SINK_PARTIAL_INSERT,
-                TableSinkTestPrograms.SINK_UPSERT);
+                TableSinkTestPrograms.SINK_UPSERT,
+                
TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE,
+                TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA);
+    }
+
+    @Override
+    protected Stream<String> getSavepointPaths(
+            TableTestProgram program, ExecNodeMetadata metadata) {
+        // disable the writable metadata test for sink node with version 1. it 
fails after the

Review Comment:
   Can we generalize this into a method in RestoreTestBase that 
TableSinkRestoreTest can implement for ignoring certain programs with versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to