This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c02d4fed36 [Improve][Connector-v2] Use regex to match filedName 
placeholders in jdbc sink (#8222)
c02d4fed36 is described below

commit c02d4fed36a6359ae7978a54a70d6c28286eead9
Author: dailai <837833...@qq.com>
AuthorDate: Wed Dec 11 09:38:18 2024 +0800

    [Improve][Connector-v2] Use regex to match filedName placeholders in jdbc 
sink (#8222)
---
 .../executor/FieldNamedPreparedStatement.java      | 47 +++++------
 .../executor/FieldNamedPreparedStatementTest.java  | 96 ++++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     |  6 +-
 .../test/resources/jdbc_mysql_source_and_sink.conf |  2 +-
 .../test/resources/jdbc_mysql_source_and_sink.sql  |  2 +-
 .../jdbc_mysql_source_and_sink_parallel.conf       |  2 +-
 .../jdbc_mysql_source_and_sink_parallel.sql        |  4 +-
 ...mysql_source_and_sink_parallel_upper_lower.conf |  2 +-
 .../transform/sql/zeta/ZetaSQLEngine.java          | 19 +++--
 .../seatunnel/transform/sql/SQLTransformTest.java  |  3 +-
 10 files changed, 142 insertions(+), 41 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index 88e658fc38..8b7f15f364 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,6 +49,8 @@ import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -669,29 +673,26 @@ public class FieldNamedPreparedStatement implements 
PreparedStatement {
                 connection.prepareStatement(parsedSQL), indexMapping);
     }
 
-    private static String parseNamedStatement(String sql, Map<String, 
List<Integer>> paramMap) {
-        StringBuilder parsedSql = new StringBuilder();
-        int fieldIndex = 1; // SQL statement parameter index starts from 1
-        int length = sql.length();
-        for (int i = 0; i < length; i++) {
-            char c = sql.charAt(i);
-            if (':' == c) {
-                int j = i + 1;
-                while (j < length && 
Character.isJavaIdentifierPart(sql.charAt(j))) {
-                    j++;
-                }
-                String parameterName = sql.substring(i + 1, j);
-                checkArgument(
-                        !parameterName.isEmpty(),
-                        "Named parameters in SQL statement must not be 
empty.");
-                paramMap.computeIfAbsent(parameterName, n -> new 
ArrayList<>()).add(fieldIndex);
-                fieldIndex++;
-                i = j - 1;
-                parsedSql.append('?');
-            } else {
-                parsedSql.append(c);
-            }
+    @VisibleForTesting
+    public static String parseNamedStatement(String sql, Map<String, 
List<Integer>> paramMap) {
+        Pattern pattern =
+                
Pattern.compile(":([\\p{L}\\p{Nl}\\p{Nd}\\p{Pc}\\$\\-\\.@%&*#~!?^+=<>|]+)");
+        Matcher matcher = pattern.matcher(sql);
+
+        StringBuffer result = new StringBuffer();
+        int fieldIndex = 1;
+
+        while (matcher.find()) {
+            String parameterName = matcher.group(1);
+            checkArgument(
+                    !parameterName.isEmpty(),
+                    "Named parameters in SQL statement must not be empty.");
+            paramMap.computeIfAbsent(parameterName, n -> new 
ArrayList<>()).add(fieldIndex++);
+            matcher.appendReplacement(result, "?");
         }
-        return parsedSql.toString();
+
+        matcher.appendTail(result);
+
+        return result.toString();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
new file mode 100644
index 0000000000..b393c844ee
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FieldNamedPreparedStatementTest {
+
+    private static final String[] SPECIAL_FILEDNAMES =
+            new String[] {
+                "USER@TOKEN",
+                "字段%名称",
+                "field_name",
+                "field.name",
+                "field-name",
+                "$fieldName",
+                "field&key",
+                "field*value",
+                "field#1",
+                "field~test",
+                "field!data",
+                "field?question",
+                "field^caret",
+                "field+add",
+                "field=value",
+                "fieldmax",
+                "field|pipe"
+            };
+
+    @Test
+    public void testParseNamedStatementWithSpecialCharacters() {
+        String sql =
+                "INSERT INTO `nhp_emr_ws`.`cm_prescriptiondetails_cs` 
(`USER@TOKEN`, `字段%名称`, `field_name`, `field.name`, `field-name`, `$fieldName`, 
`field&key`, `field*value`, `field#1`, `field~test`, `field!data`, 
`field?question`, `field^caret`, `field+add`, `field=value`, `fieldmax`, 
`field|pipe`) VALUES (:USER@TOKEN, :字段%名称, :field_name, :field.name, 
:field-name, :$fieldName, :field&key, :field*value, :field#1, :field~test, 
:field!data, :field?question, :field^caret, :field+add, :f [...]
+
+        String exceptPreparedstatement =
+                "INSERT INTO `nhp_emr_ws`.`cm_prescriptiondetails_cs` 
(`USER@TOKEN`, `字段%名称`, `field_name`, `field.name`, `field-name`, `$fieldName`, 
`field&key`, `field*value`, `field#1`, `field~test`, `field!data`, 
`field?question`, `field^caret`, `field+add`, `field=value`, `fieldmax`, 
`field|pipe`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON 
DUPLICATE KEY UPDATE `USER@TOKEN`=VALUES(`USER@TOKEN`), 
`字段%名称`=VALUES(`字段%名称`), `field_name`=VALUES(`field_name`), `field.nam [...]
+
+        Map<String, List<Integer>> paramMap = new HashMap<>();
+        String actualSQL = 
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+        assertEquals(exceptPreparedstatement, actualSQL);
+        for (int i = 0; i < SPECIAL_FILEDNAMES.length; i++) {
+            assertTrue(paramMap.containsKey(SPECIAL_FILEDNAMES[i]));
+            assertEquals(i + 1, paramMap.get(SPECIAL_FILEDNAMES[i]).get(0));
+        }
+    }
+
+    @Test
+    public void testParseNamedStatement() {
+        String sql = "UPDATE table SET col1 = :param1, col2 = :param1 WHERE 
col3 = :param2";
+        Map<String, List<Integer>> paramMap = new HashMap<>();
+        String expectedSQL = "UPDATE table SET col1 = ?, col2 = ? WHERE col3 = 
?";
+
+        String actualSQL = 
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+
+        assertEquals(expectedSQL, actualSQL);
+        assertTrue(paramMap.containsKey("param1"));
+        assertTrue(paramMap.containsKey("param2"));
+        assertEquals(1, paramMap.get("param1").get(0).intValue());
+        assertEquals(2, paramMap.get("param1").get(1).intValue());
+        assertEquals(3, paramMap.get("param2").get(0).intValue());
+    }
+
+    @Test
+    public void testParseNamedStatementWithNoNamedParameters() {
+        String sql = "SELECT * FROM table";
+        Map<String, List<Integer>> paramMap = new HashMap<>();
+        String expectedSQL = "SELECT * FROM table";
+
+        String actualSQL = 
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+
+        assertEquals(expectedSQL, actualSQL);
+        assertTrue(paramMap.isEmpty());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 26181669fc..feac8d11ca 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -104,7 +104,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
     private static final String CREATE_SQL =
             "CREATE TABLE IF NOT EXISTS %s\n"
                     + "(\n"
-                    + "    `c_bit_1`                bit(1)                
DEFAULT NULL,\n"
+                    + "    `c-bit_1`                bit(1)                
DEFAULT NULL,\n"
                     + "    `c_bit_8`                bit(8)                
DEFAULT NULL,\n"
                     + "    `c_bit_16`               bit(16)               
DEFAULT NULL,\n"
                     + "    `c_bit_32`               bit(32)               
DEFAULT NULL,\n"
@@ -191,7 +191,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
             String executeKey, TestContainer container, Container.ExecResult 
execResult) {
         String[] fieldNames =
                 new String[] {
-                    "c_bit_1",
+                    "c-bit_1",
                     "c_bit_8",
                     "c_bit_16",
                     "c_bit_32",
@@ -249,7 +249,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
     Pair<String[], List<SeaTunnelRow>> initTestData() {
         String[] fieldNames =
                 new String[] {
-                    "c_bit_1",
+                    "c-bit_1",
                     "c_bit_8",
                     "c_bit_16",
                     "c_bit_32",
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
index a781c8c3f2..45febb436f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -46,7 +46,7 @@ sink {
     user = "root"
     password = "Abc!@#135_seatunnel"
 
-    query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
+    query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
index 84f049bec1..4b0240e3fe 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
@@ -51,7 +51,7 @@ CREATE TABLE sink_table WITH (
 
 
 INSERT INTO sink_table
-  SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, 
c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
+  SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, 
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
          c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, 
c_bigint_unsigned,
          c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, 
c_double_unsigned,
          c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, 
c_longtext, c_date,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
index 48474c6dfa..e21c75992c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
@@ -45,7 +45,7 @@ sink {
     user = "root"
     password = "Abc!@#135_seatunnel"
     connection_check_timeout_sec = 100
-    query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
+    query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
index bc032b9c22..33a273f341 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
@@ -49,7 +49,7 @@ CREATE TABLE sink_table WITH (
 
 
 CREATE TABLE temp1 AS
-    SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, 
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
+    SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, 
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
            c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, 
c_bigint_unsigned,
            c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, 
c_double_unsigned,
            c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, 
c_longtext, c_date,
@@ -58,4 +58,4 @@ CREATE TABLE temp1 AS
 
 
 INSERT INTO sink_table SELECT * FROM temp1;
- 
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index cd486d4c4e..b6b942af18 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -46,7 +46,7 @@ sink {
     user = "root"
     password = "Abc!@#135_seatunnel"
     connection_check_timeout_sec = 100
-    query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
+    query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, 
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, 
c_smallint_unsigned,
                                                 c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
                                                 c_decimal, c_decimal_unsigned, 
c_float, c_float_unsigned, c_double, c_double_unsigned,
                                                 c_char, c_tinytext, 
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 2848cc9094..9318ff0b05 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -183,7 +183,7 @@ public class ZetaSQLEngine implements SQLEngine {
         for (SelectItem selectItem : selectItems) {
             if (selectItem.getExpression() instanceof AllColumns) {
                 for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
-                    fieldNames[idx] = inputRowType.getFieldName(i);
+                    fieldNames[idx] = 
cleanEscape(inputRowType.getFieldName(i));
                     seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
                     if (inputColumnsMapping != null) {
                         inputColumnsMapping.set(idx, 
inputRowType.getFieldName(i));
@@ -194,16 +194,12 @@ public class ZetaSQLEngine implements SQLEngine {
                 Expression expression = selectItem.getExpression();
                 if (selectItem.getAlias() != null) {
                     String aliasName = selectItem.getAlias().getName();
-                    if (aliasName.startsWith(ESCAPE_IDENTIFIER)
-                            && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
-                        aliasName = aliasName.substring(1, aliasName.length() 
- 1);
-                    }
-                    fieldNames[idx] = aliasName;
+                    fieldNames[idx] = cleanEscape(aliasName);
                 } else {
                     if (expression instanceof Column) {
-                        fieldNames[idx] = ((Column) 
expression).getColumnName();
+                        fieldNames[idx] = cleanEscape(((Column) 
expression).getColumnName());
                     } else {
-                        fieldNames[idx] = expression.toString();
+                        fieldNames[idx] = cleanEscape(expression.toString());
                     }
                 }
 
@@ -225,6 +221,13 @@ public class ZetaSQLEngine implements SQLEngine {
                 fieldNames, seaTunnelDataTypes, lateralViews, 
inputColumnsMapping);
     }
 
+    private static String cleanEscape(String columnName) {
+        if (columnName.startsWith(ESCAPE_IDENTIFIER) && 
columnName.endsWith(ESCAPE_IDENTIFIER)) {
+            columnName = columnName.substring(1, columnName.length() - 1);
+        }
+        return columnName;
+    }
+
     @Override
     public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, 
SeaTunnelRowType outRowType) {
         // ------Physical Query Plan Execution------
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index fcf14cc7b9..999b7fecd4 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -166,12 +166,13 @@ public class SQLTransformTest {
                 ReadonlyConfig.fromMap(
                         Collections.singletonMap(
                                 "query",
-                                "select id, trim(`apply`) as `apply` from test 
where `apply` = 'a'"));
+                                "select `id`, trim(`apply`) as `apply` from 
test where `apply` = 'a'"));
         SQLTransform sqlTransform = new SQLTransform(config, table);
         TableSchema tableSchema = sqlTransform.transformTableSchema();
         List<SeaTunnelRow> result =
                 sqlTransform.transformRow(
                         new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("a")}));
+        Assertions.assertEquals("id", tableSchema.getFieldNames()[0]);
         Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
         Assertions.assertEquals("a", result.get(0).getField(1));
         result =

Reply via email to