This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ad5278c5bb [Feature][Transform-V2] Support `AT TIME ZONE` statement 
for sql transform (#9784)
ad5278c5bb is described below

commit ad5278c5bbec7e9a0b6e94ed7b69c96c73b4ff87
Author: Jia Fan <[email protected]>
AuthorDate: Tue Sep 2 23:46:45 2025 +0800

    [Feature][Transform-V2] Support `AT TIME ZONE` statement for sql transform 
(#9784)
---
 docs/en/transform-v2/sql-functions.md              | 16 ++++-
 docs/zh/transform-v2/sql-functions.md              | 17 +++++-
 .../converter/AbstractJdbcRowConverter.java        |  5 ++
 .../dialect/inceptor/InceptorJdbcRowConverter.java |  7 +++
 .../dialect/kingbase/KingbaseJdbcRowConverter.java |  7 ++-
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  |  6 ++
 .../dialect/psql/PostgresJdbcRowConverter.java     |  6 ++
 .../seatunnel/cdc/oracle/OracleCDCIT.java          | 28 ++++-----
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    | 28 ++++-----
 .../seatunnel/jdbc/JdbcAutoGenerateSQLIT.java      | 23 +++++++-
 .../seatunnel/jdbc/JdbcPostgresIdentifierIT.java   | 28 ++++-----
 .../resources/jdbc_sink_auto_generate_sql.conf     |  1 +
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  | 27 ++++-----
 .../apache/seatunnel/e2e/common/util/JdbcUtil.java | 48 +++++++++++++++
 .../transform/sql/zeta/ZetaSQLFunction.java        | 11 ++++
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  4 ++
 .../sql/zeta/functions/DateTimeFunction.java       | 26 ++++++++
 .../transform/sql/zeta/DateTimeFunctionTest.java   | 69 ++++++++++++++++++++++
 18 files changed, 282 insertions(+), 75 deletions(-)

diff --git a/docs/en/transform-v2/sql-functions.md 
b/docs/en/transform-v2/sql-functions.md
index 857932c2bd..22d0c6b4e9 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -958,7 +958,7 @@ Convert the number of seconds from the UNIX epoch 
(1970-01-01 00:00:00 UTC) to a
 
 The most important format characters are: y year, M month, d day, H hour, m 
minute, s second. For details of the format, see 
`java.time.format.DateTimeFormatter`.
 
-`timeZone` is optional, default value is system's time zone. `timezone` value 
can be a `UTC+ timezone offset`, for example, `UTC+8` represents the 
Asia/Shanghai time zone, see `java.time.ZoneId`
+`timeZone` is optional, default value is system's time zone. `timezone` value 
can be a `UTC+ timezone offset`, for example, `UTC+8` represents the 
Asia/Shanghai time zone, see  
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones .
 
 
 Example:
@@ -973,6 +973,20 @@ or
 
 CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6')
 
+### AT TIME ZONE
+
+```dateAndTime AT TIME ZONE 'timeZone' -> TIMESTAMP_TZ```
+
+Convert a timestamp value to a TIMESTAMP WITH TIME ZONE value in the specified 
time zone.
+
+`timeZone` value can be a `UTC+ timezone offset`, for example, `+08:00` 
represents the Asia/Shanghai time zone, see 
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones .
+
+Example:
+
+local_date_time AT TIME ZONE '+09:00'
+
+offset_date_time AT TIME ZONE 'Pacific/Honolulu'
+
 ## System Functions
 
 ### CAST
diff --git a/docs/zh/transform-v2/sql-functions.md 
b/docs/zh/transform-v2/sql-functions.md
index 188d12f665..c5fe31e5d9 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -954,7 +954,7 @@ YEAR(CREATED)
 
 最重要的格式字符包括:y(年)、M(月)、d(日)、H(时)、m(分)、s(秒)。有关格式的详细信息,请参阅 
`java.time.format.DateTimeFormatter`。
 
-`timeZone` 是可选的,默认值为系统的时区。`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`UTC+8` 
表示亚洲/上海时区,请参阅 `java.time.ZoneId`。
+`timeZone` 是可选的,默认值为系统的时区。`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`UTC+8` 
表示亚洲/上海时区,请参阅 https://en.wikipedia.org/wiki/List_of_tz_database_time_zones 。
 
 示例:
 
@@ -968,6 +968,21 @@ or
 
 CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6')
 
+
+### AT TIME ZONE
+
+```dateAndTime AT TIME ZONE 'timeZone' -> TIMESTAMP_TZ```
+
+转换一个时间戳值为指定时区的带时区时间戳值。
+
+`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`+08:00` 表示亚洲/上海时区,请参阅 
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones 。
+
+Example:
+
+local_date_time AT TIME ZONE '+09:00'
+
+offset_date_time AT TIME ZONE 'Pacific/Honolulu'
+
 ## System Functions
 
 ### CAST
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 42bcf2d894..d5c3aa95ac 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -44,6 +44,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -275,6 +276,10 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
                 LocalDateTime localDateTime = (LocalDateTime) value;
                 statement.setTimestamp(statementIndex, 
Timestamp.valueOf(localDateTime));
                 break;
+            case TIMESTAMP_TZ:
+                OffsetDateTime offsetDateTime = (OffsetDateTime) value;
+                statement.setTimestamp(statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                break;
             case BYTES:
                 statement.setBytes(statementIndex, (byte[]) value);
                 break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
index 33c689fd39..1a7b3dbd72 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
@@ -35,9 +35,11 @@ import javax.annotation.Nullable;
 
 import java.math.BigDecimal;
 import java.sql.PreparedStatement;
+import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 
 public class InceptorJdbcRowConverter extends HiveJdbcRowConverter {
 
@@ -103,6 +105,11 @@ public class InceptorJdbcRowConverter extends 
HiveJdbcRowConverter {
                         statement.setTimestamp(
                                 statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
                         break;
+                    case TIMESTAMP_TZ:
+                        OffsetDateTime offsetDateTime = (OffsetDateTime) 
row.getField(fieldIndex);
+                        statement.setTimestamp(
+                                statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                        break;
                     case BYTES:
                         statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
                         break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index f766ce3980..db5d23aae9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -39,6 +39,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.Optional;
 
 public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -174,8 +175,12 @@ public class KingbaseJdbcRowConverter extends 
AbstractJdbcRowConverter {
                     break;
                 case TIMESTAMP:
                     LocalDateTime localDateTime = (LocalDateTime) 
row.getField(fieldIndex);
+                    statement.setTimestamp(statementIndex, 
Timestamp.valueOf(localDateTime));
+                    break;
+                case TIMESTAMP_TZ:
+                    OffsetDateTime offsetDateTime = (OffsetDateTime) 
row.getField(fieldIndex);
                     statement.setTimestamp(
-                            statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
+                            statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
                     break;
                 case BYTES:
                     statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index bf49d4d0b7..0c7fba387b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -46,6 +46,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.Optional;
 
 public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -220,6 +221,11 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
                         statement.setTimestamp(
                                 statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
                         break;
+                    case TIMESTAMP_TZ:
+                        OffsetDateTime offsetDateTime = (OffsetDateTime) 
row.getField(fieldIndex);
+                        statement.setTimestamp(
+                                statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                        break;
                     case BYTES:
                         statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
                         break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 6866e428e4..be7c8efd18 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -49,6 +49,7 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.Locale;
 import java.util.Optional;
 
@@ -253,6 +254,11 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                         statement.setTimestamp(
                                 statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
                         break;
+                    case TIMESTAMP_TZ:
+                        OffsetDateTime offsetDateTime = (OffsetDateTime) 
row.getField(fieldIndex);
+                        statement.setTimestamp(
+                                statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                        break;
                     case BYTES:
                         statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
                         break;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index ca0d2cddfd..b68011e689 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
@@ -38,10 +39,8 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -565,22 +564,15 @@ public class OracleCDCIT extends AbstractOracleCDCIT 
implements TestResource {
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection(ORACLE_CONTAINER);
-                Statement statement = connection.createStatement()) {
-            ResultSet resultSet = statement.executeQuery(sql);
-            List<List<Object>> result = new ArrayList<>();
-            int columnCount = resultSet.getMetaData().getColumnCount();
-            while (resultSet.next()) {
-                ArrayList<Object> objects = new ArrayList<>();
-                for (int i = 1; i <= columnCount; i++) {
-                    objects.add(resultSet.getObject(i));
-                }
-                result.add(objects);
-            }
-            return result;
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
+        return JdbcUtil.querySql(
+                sql,
+                () -> {
+                    try {
+                        return getJdbcConnection(ORACLE_CONTAINER);
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     private void executeSql(String sql) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 259d9d4e6c..11083fcf89 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.awaitility.Awaitility;
@@ -55,10 +56,8 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -452,22 +451,15 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sql)) {
-            List<List<Object>> result = new ArrayList<>();
-            int columnCount = resultSet.getMetaData().getColumnCount();
-            while (resultSet.next()) {
-                ArrayList<Object> objects = new ArrayList<>();
-                for (int i = 1; i <= columnCount; i++) {
-                    objects.add(resultSet.getObject(i));
-                }
-                result.add(objects);
-            }
-            return result;
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
+        return JdbcUtil.querySql(
+                sql,
+                () -> {
+                    try {
+                        return this.getJdbcConnection();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     private void executeSql(String sql) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
index 34118fab90..389c50345c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
@@ -41,9 +41,11 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.apache.seatunnel.e2e.common.util.JdbcUtil.querySql;
 import static org.awaitility.Awaitility.given;
 
 @Slf4j
@@ -90,6 +92,24 @@ public class JdbcAutoGenerateSQLIT extends TestSuiteBase 
implements TestResource
             throws IOException, InterruptedException {
         Container.ExecResult execResult = 
container.executeJob("/jdbc_sink_auto_generate_sql.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
+        List<Object> result =
+                querySql(
+                                "select * from sink limit 1",
+                                () -> {
+                                    try {
+                                        return DriverManager.getConnection(
+                                                
postgreSQLContainer.getJdbcUrl(),
+                                                
postgreSQLContainer.getUsername(),
+                                                
postgreSQLContainer.getPassword());
+                                    } catch (SQLException e) {
+                                        throw new RuntimeException(e);
+                                    }
+                                })
+                        .get(0);
+        Assertions.assertInstanceOf(Long.class, result.get(0));
+        Assertions.assertInstanceOf(String.class, result.get(1));
+        Assertions.assertInstanceOf(Integer.class, result.get(2));
+        Assertions.assertInstanceOf(java.sql.Timestamp.class, result.get(3));
     }
 
     @TestTemplate
@@ -111,7 +131,8 @@ public class JdbcAutoGenerateSQLIT extends TestSuiteBase 
implements TestResource
                     "create table sink(\n"
                             + "user_id BIGINT NOT NULL PRIMARY KEY,\n"
                             + "name varchar(255),\n"
-                            + "age INT\n"
+                            + "age INT,\n"
+                            + "timestamp_tz TIMESTAMPTZ \n"
                             + ")";
             statement.execute(sink);
         } catch (SQLException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
index f59972ee26..c8aaf22c0a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -41,10 +42,8 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -358,22 +357,15 @@ public class JdbcPostgresIdentifierIT extends 
TestSuiteBase implements TestResou
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection();
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sql)) {
-            List<List<Object>> result = new ArrayList<>();
-            int columnCount = resultSet.getMetaData().getColumnCount();
-            while (resultSet.next()) {
-                ArrayList<Object> objects = new ArrayList<>();
-                for (int i = 1; i <= columnCount; i++) {
-                    objects.add(resultSet.getObject(i));
-                }
-                result.add(objects);
-            }
-            return result;
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
+        return JdbcUtil.querySql(
+                sql,
+                () -> {
+                    try {
+                        return this.getJdbcConnection();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     private void executeSQL(String sql) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
index f185887a69..d807040312 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
@@ -28,6 +28,7 @@ source {
         user_id = bigint
         name = string
         age = int
+        timestamp_tz = timestamp_tz
       }
     }
     plugin_output = "fake"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 916f3be8ef..123de8bf9c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -53,10 +54,8 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -500,21 +499,15 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection();
-                ResultSet resultSet = 
connection.createStatement().executeQuery(sql)) {
-            List<List<Object>> result = new ArrayList<>();
-            int columnCount = resultSet.getMetaData().getColumnCount();
-            while (resultSet.next()) {
-                ArrayList<Object> objects = new ArrayList<>();
-                for (int i = 1; i <= columnCount; i++) {
-                    objects.add(resultSet.getObject(i));
-                }
-                result.add(objects);
-            }
-            return result;
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
+        return JdbcUtil.querySql(
+                sql,
+                () -> {
+                    try {
+                        return this.getJdbcConnection();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     private void executeSQL(String sql) {
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
new file mode 100644
index 0000000000..a191305dab
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.util;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class JdbcUtil {
+
+    public static List<List<Object>> querySql(String sql, Supplier<Connection> 
connectionSupplier) {
+        try (Connection connection = connectionSupplier.get();
+                Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            List<List<Object>> result = new ArrayList<>();
+            int columnCount = resultSet.getMetaData().getColumnCount();
+            while (resultSet.next()) {
+                ArrayList<Object> objects = new ArrayList<>();
+                for (int i = 1; i <= columnCount; i++) {
+                    objects.add(resultSet.getObject(i));
+                }
+                result.add(objects);
+            }
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 86eaa9ccad..32f723f603 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -52,6 +52,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.SignedExpression;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TimezoneExpression;
 import net.sf.jsqlparser.expression.TrimFunction;
 import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
@@ -70,6 +71,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
+import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -393,6 +395,15 @@ public class ZetaSQLFunction {
             }
             return executeCastExpr(castExpression, leftValue);
         }
+        if (expression instanceof TimezoneExpression) {
+            TimezoneExpression timezoneExpression = (TimezoneExpression) 
expression;
+            Expression leftExpr = timezoneExpression.getLeftExpression();
+            Object leftValue = computeForValue(leftExpr, inputFields);
+            Object timeZoneId =
+                    computeForValue(
+                            
timezoneExpression.getTimezoneExpressions().get(0), inputFields);
+            return DateTimeFunction.atTimeZone((TemporalAccessor) leftValue, 
timeZoneId);
+        }
         throw new TransformException(
                 CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                 String.format("Unsupported SQL Expression: %s ", 
expression.toString()));
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 83c9550bed..5300e62e08 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -46,6 +46,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
 import net.sf.jsqlparser.expression.SignedExpression;
 import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TimezoneExpression;
 import net.sf.jsqlparser.expression.TrimFunction;
 import net.sf.jsqlparser.expression.WhenClause;
 import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
@@ -240,6 +241,9 @@ public class ZetaSQLType {
                 return BasicType.DOUBLE_TYPE;
             }
         }
+        if (expression instanceof TimezoneExpression) {
+            return LocalTimeType.OFFSET_DATE_TIME_TYPE;
+        }
         throw new TransformException(
                 CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                 String.format("Unsupported SQL Expression: %s ", 
expression.toString()));
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
index 2758f232cb..e677a37009 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
@@ -716,4 +716,30 @@ public class DateTimeFunction {
         LocalDateTime datetime = 
Instant.ofEpochSecond(unixTime).atZone(zoneId).toLocalDateTime();
         return df.format(datetime);
     }
+
+    public static OffsetDateTime atTimeZone(TemporalAccessor datetime, Object 
timeZone) {
+        if (datetime == null) {
+            return null;
+        }
+        if (timeZone == null) {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                    "The timeZone argument of function: AT TIME ZONE can not 
be null");
+        }
+        ZoneId zoneId = ZoneId.of(timeZone.toString());
+        if (datetime instanceof LocalDateTime) {
+            return ((LocalDateTime) datetime)
+                    .atZone(ZoneId.systemDefault())
+                    .withZoneSameInstant(zoneId)
+                    .toOffsetDateTime();
+        } else if (datetime instanceof OffsetDateTime) {
+            Instant instant = ((OffsetDateTime) datetime).toInstant();
+            return instant.atZone(zoneId).toOffsetDateTime();
+        } else {
+            throw new TransformException(
+                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                    String.format(
+                            "Unsupported type %s for function: AT TIME ZONE", 
datetime.getClass()));
+        }
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
index c10856a47f..d30a4b0877 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.transform.sql.zeta;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -27,6 +28,10 @@ import org.apache.seatunnel.transform.sql.SQLEngineFactory;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+
 public class DateTimeFunctionTest {
 
     @Test
@@ -63,6 +68,70 @@ public class DateTimeFunctionTest {
         Assertions.assertEquals("2023-01-01 10:00:00", field1.toString());
     }
 
+    @Test
+    public void testAtTimeZoneFunction() {
+        SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"local_date_time", "offset_date_time"},
+                        new SeaTunnelDataType[] {
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE, 
LocalTimeType.OFFSET_DATE_TIME_TYPE
+                        });
+
+        LocalDateTime now = LocalDateTime.now();
+        SeaTunnelRow inputRow =
+                new SeaTunnelRow(
+                        new Object[] {now, 
now.atZone(ZoneId.systemDefault()).toOffsetDateTime()});
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select local_date_time AT TIME ZONE '+09:00' as 
date_time_with_zone,"
+                        + "offset_date_time AT TIME ZONE '-05:00' as 
offset_date_time_with_zone"
+                        + " from dual");
+        SeaTunnelRowType seaTunnelRowType = sqlEngine.typeMapping(new 
ArrayList<>());
+        Assertions.assertEquals(
+                LocalTimeType.OFFSET_DATE_TIME_TYPE, 
seaTunnelRowType.getFieldType(0));
+
+        SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow, 
rowType).get(0);
+        Assertions.assertEquals(
+                now.atZone(ZoneId.systemDefault())
+                        .withZoneSameInstant(ZoneId.of("+09:00"))
+                        .toOffsetDateTime(),
+                outRow.getField(0));
+        Assertions.assertEquals(
+                now.atZone(ZoneId.systemDefault())
+                        .withZoneSameInstant(ZoneId.of("-05:00"))
+                        .toOffsetDateTime(),
+                outRow.getField(1));
+
+        sqlEngine.init(
+                "test",
+                null,
+                rowType,
+                "select local_date_time AT TIME ZONE 'Asia/Tokyo' as 
date_time_with_zone,"
+                        + "offset_date_time AT TIME ZONE 'Pacific/Honolulu' as 
offset_date_time_with_zone"
+                        + " from dual");
+        seaTunnelRowType = sqlEngine.typeMapping(new ArrayList<>());
+        Assertions.assertEquals(
+                LocalTimeType.OFFSET_DATE_TIME_TYPE, 
seaTunnelRowType.getFieldType(0));
+        Assertions.assertEquals(
+                LocalTimeType.OFFSET_DATE_TIME_TYPE, 
seaTunnelRowType.getFieldType(1));
+
+        outRow = sqlEngine.transformBySQL(inputRow, rowType).get(0);
+        Assertions.assertEquals(
+                now.atZone(ZoneId.systemDefault())
+                        .withZoneSameInstant(ZoneId.of("+09:00"))
+                        .toOffsetDateTime(),
+                outRow.getField(0));
+        Assertions.assertEquals(
+                now.atZone(ZoneId.systemDefault())
+                        .withZoneSameInstant(ZoneId.of("-10:00"))
+                        .toOffsetDateTime(),
+                outRow.getField(1));
+    }
+
     @Test
     public void testFromUnixtimeFunctionWithIntegerInput() {
         SQLEngine sqlEngine = 
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);


Reply via email to