yuxiqian commented on code in PR #3939: URL: https://github.com/apache/flink-cdc/pull/3939#discussion_r1983226904
########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ########## Review Comment: Seems this fix should close FLINK-37326, too. Adding a test case to verify this would be nice. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ########## @@ -642,4 +654,16 @@ private static boolean hasAsterisk(SqlNode sqlNode) { return false; } } + + public static Map<String, String> generateColumnNameMap(List<String> originalColumnNames) { + int i = 0; + Map<String, String> columnNameMap = new HashMap<>(); + for (String columnName : originalColumnNames) { + if (!columnNameMap.containsKey(columnName)) { + columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i); + i++; + } + } + return columnNameMap; + } Review Comment: Minor concern: though it's easy and simple to use `field\d` as column name, it would produce cryptic error message if Janino throws an exception about generated expressions. Some options might be: 1. We can leave "legal" names as is without mangling them, and only map names that is not valid Java identifiers. 2. Try to extract valid characters from original column name, and append them as a hint. For example, `invalid-name` could be mapped to `column1_invalidname` (recognizable, at least). 3. Always provide the column name map information when exceptions are thrown. ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java: ########## Review Comment: Maybe we need to test if name mangling approach works well with schema evolution altering columns. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java: ########## @@ -41,11 +42,17 @@ public class TransformFilter implements Serializable { private final String expression; private final String scriptExpression; private final List<String> columnNames; + private final Map<String, String> columnNameMap; - public TransformFilter(String expression, String scriptExpression, List<String> columnNames) { + public TransformFilter( + String expression, + String scriptExpression, + List<String> columnNames, + Map<String, String> columnNameMap) { Review Comment: IIUC `TransformFilter` is static and will not be refreshed after schema changes, but `columnNameMap` might change by altering columns. Will inserting columns in the middle affect the numbering system? `ProjectionColumn` looks fine (since projection stuff should be recreated every time when schema changes), but still need to be verified by an IT case. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ########## @@ -95,6 +95,8 @@ public class TransformParser { private static final Logger LOG = LoggerFactory.getLogger(TransformParser.class); private static final String DEFAULT_SCHEMA = "default_schema"; private static final String DEFAULT_TABLE = "TB"; + private static final String MAPPED_COLUMN_NAME_PREFIX = "field"; Review Comment: Just thinking if we can use RegExp-like names (`$1`, `$2`, ...)? Ignore this comment if it doesn't look right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org