yuxiqian commented on code in PR #3939:
URL: https://github.com/apache/flink-cdc/pull/3939#discussion_r1983226904


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########


Review Comment:
   Seems this fix should close FLINK-37326, too. Adding a test case to verify 
this would be nice.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -642,4 +654,16 @@ private static boolean hasAsterisk(SqlNode sqlNode) {
             return false;
         }
     }
+
+    public static Map<String, String> generateColumnNameMap(List<String> 
originalColumnNames) {
+        int i = 0;
+        Map<String, String> columnNameMap = new HashMap<>();
+        for (String columnName : originalColumnNames) {
+            if (!columnNameMap.containsKey(columnName)) {
+                columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i);
+                i++;
+            }
+        }
+        return columnNameMap;
+    }

Review Comment:
   Minor concern: though it's easy and simple to use `field\d` as column name, 
it would produce cryptic error message if Janino throws an exception about 
generated expressions.
   
   Some options might be:
   
   1. We can leave "legal" names as is without mangling them, and only map 
names that is not valid Java identifiers.
   2. Try to extract valid characters from original column name, and append 
them as a hint. For example, `invalid-name` could be mapped to 
`column1_invalidname` (recognizable, at least).
   3. Always provide the column name map information when exceptions are thrown.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########


Review Comment:
   Maybe we need to test if name mangling approach works well with schema 
evolution altering columns.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java:
##########
@@ -41,11 +42,17 @@ public class TransformFilter implements Serializable {
     private final String expression;
     private final String scriptExpression;
     private final List<String> columnNames;
+    private final Map<String, String> columnNameMap;
 
-    public TransformFilter(String expression, String scriptExpression, 
List<String> columnNames) {
+    public TransformFilter(
+            String expression,
+            String scriptExpression,
+            List<String> columnNames,
+            Map<String, String> columnNameMap) {

Review Comment:
   IIUC `TransformFilter` is static and will not be refreshed after schema 
changes, but `columnNameMap` might change by altering columns. Will inserting 
columns in the middle affect the numbering system?
   
   `ProjectionColumn` looks fine (since projection stuff should be recreated 
every time when schema changes), but still need to be verified by an IT case.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -95,6 +95,8 @@ public class TransformParser {
     private static final Logger LOG = 
LoggerFactory.getLogger(TransformParser.class);
     private static final String DEFAULT_SCHEMA = "default_schema";
     private static final String DEFAULT_TABLE = "TB";
+    private static final String MAPPED_COLUMN_NAME_PREFIX = "field";

Review Comment:
   Just thinking if we can use RegExp-like names (`$1`, `$2`, ...)? Ignore this 
comment if it doesn't look right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to