twalthr commented on code in PR #27301:
URL: https://github.com/apache/flink/pull/27301#discussion_r2581729426


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java:
##########
@@ -86,4 +90,45 @@ public class SinkTestPrograms {
                     .runSql(
                             "INSERT INTO sink_t SELECT UPPER(name), SUM(score) 
FROM source_t GROUP BY name")
                     .build();
+
+    public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA 
=
+            TableTestProgram.of(
+                            
"insert-into-upsert-with-sink-upsert-materializer-writable-metadata",
+                            "The query requires a sink upsert materializer and 
the sink"
+                                    + " uses writable metadata columns. The 
scenario showcases a"
+                                    + " bug where a wrong type was used in 
sinks which did not"
+                                    + " consider metadata columns. There needs 
to be multiple"
+                                    + " requirements for the bug to show up. 
1. We need to rocksdb,"
+                                    + " so that we use a serializer when 
putting records into"
+                                    + " state in SinkUpsertMaterializer. 2. We 
need to retract to"
+                                    + " a previous value taken from the state, 
otherwise we forward"
+                                    + " the incoming record. 3. There need to 
be metadata columns.")

Review Comment:
   ```suggestion
                                       + " the incoming record. 3. There need 
to be persisted metadata columns.")
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java:
##########
@@ -151,7 +151,7 @@ protected Transformation<Object> createSinkTransformation(
                 tableSink.getSinkRuntimeProvider(
                         new SinkRuntimeProviderContext(
                                 isBounded, tableSinkSpec.getTargetColumns()));
-        final RowType physicalRowType = getPhysicalRowType(schema);
+        final RowType physicalRowType = getPersistedRowType(schema);

Review Comment:
   rename local variable as well



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -391,4 +396,34 @@ private static SequencedMultiSetStateConfig 
createStateConfig(
                 throw new IllegalArgumentException("Unsupported strategy: " + 
strategy);
         }
     }
+
+    /**
+     * The method recreates the type of the incoming record from the sink's 
schema. It puts the
+     * physical columns first, followed by persisted metadata columns.
+     */
+    @Override
+    protected final RowType getPersistedRowType(ResolvedSchema schema) {

Review Comment:
   Is there no runtime change necessary?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java:
##########
@@ -86,4 +90,45 @@ public class SinkTestPrograms {
                     .runSql(
                             "INSERT INTO sink_t SELECT UPPER(name), SUM(score) 
FROM source_t GROUP BY name")
                     .build();
+
+    public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA 
=
+            TableTestProgram.of(
+                            
"insert-into-upsert-with-sink-upsert-materializer-writable-metadata",
+                            "The query requires a sink upsert materializer and 
the sink"
+                                    + " uses writable metadata columns. The 
scenario showcases a"
+                                    + " bug where a wrong type was used in 
sinks which did not"
+                                    + " consider metadata columns. There needs 
to be multiple"
+                                    + " requirements for the bug to show up. 
1. We need to rocksdb,"

Review Comment:
   ```suggestion
                                       + " requirements for the bug to show up. 
1. We need to use rocksdb,"
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java:
##########
@@ -124,7 +124,7 @@ protected Transformation<Object> translateToPlanInternal(
     }
 
     @Override
-    protected RowType getPhysicalRowType(ResolvedSchema schema) {
+    protected final RowType getPersistedRowType(ResolvedSchema schema) {

Review Comment:
   also in batch mode one should be able to write metadata columns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to