This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ad5278c5bb [Feature][Transform-V2] Support `AT TIME ZONE` statement
for sql transform (#9784)
ad5278c5bb is described below
commit ad5278c5bbec7e9a0b6e94ed7b69c96c73b4ff87
Author: Jia Fan <[email protected]>
AuthorDate: Tue Sep 2 23:46:45 2025 +0800
[Feature][Transform-V2] Support `AT TIME ZONE` statement for sql transform
(#9784)
---
docs/en/transform-v2/sql-functions.md | 16 ++++-
docs/zh/transform-v2/sql-functions.md | 17 +++++-
.../converter/AbstractJdbcRowConverter.java | 5 ++
.../dialect/inceptor/InceptorJdbcRowConverter.java | 7 +++
.../dialect/kingbase/KingbaseJdbcRowConverter.java | 7 ++-
.../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 6 ++
.../dialect/psql/PostgresJdbcRowConverter.java | 6 ++
.../seatunnel/cdc/oracle/OracleCDCIT.java | 28 ++++-----
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 28 ++++-----
.../seatunnel/jdbc/JdbcAutoGenerateSQLIT.java | 23 +++++++-
.../seatunnel/jdbc/JdbcPostgresIdentifierIT.java | 28 ++++-----
.../resources/jdbc_sink_auto_generate_sql.conf | 1 +
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 27 ++++-----
.../apache/seatunnel/e2e/common/util/JdbcUtil.java | 48 +++++++++++++++
.../transform/sql/zeta/ZetaSQLFunction.java | 11 ++++
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 4 ++
.../sql/zeta/functions/DateTimeFunction.java | 26 ++++++++
.../transform/sql/zeta/DateTimeFunctionTest.java | 69 ++++++++++++++++++++++
18 files changed, 282 insertions(+), 75 deletions(-)
diff --git a/docs/en/transform-v2/sql-functions.md
b/docs/en/transform-v2/sql-functions.md
index 857932c2bd..22d0c6b4e9 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -958,7 +958,7 @@ Convert the number of seconds from the UNIX epoch
(1970-01-01 00:00:00 UTC) to a
The most important format characters are: y year, M month, d day, H hour, m
minute, s second. For details of the format, see
`java.time.format.DateTimeFormatter`.
-`timeZone` is optional, default value is system's time zone. `timezone` value
can be a `UTC+ timezone offset`, for example, `UTC+8` represents the
Asia/Shanghai time zone, see `java.time.ZoneId`
+`timeZone` is optional, default value is system's time zone. `timezone` value
can be a `UTC+ timezone offset`, for example, `UTC+8` represents the
Asia/Shanghai time zone, see
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones .
Example:
@@ -973,6 +973,20 @@ or
CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6')
+### AT TIME ZONE
+
+```dateAndTime AT TIME ZONE 'timeZone' -> TIMESTAMP_TZ```
+
+Convert a timestamp value to a TIMESTAMP WITH TIME ZONE value in the specified
time zone.
+
+`timeZone` value can be a `UTC+ timezone offset`, for example, `+08:00`
represents the Asia/Shanghai time zone, see
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones .
+
+Example:
+
+local_date_time AT TIME ZONE '+09:00'
+
+offset_date_time AT TIME ZONE 'Pacific/Honolulu'
+
## System Functions
### CAST
diff --git a/docs/zh/transform-v2/sql-functions.md
b/docs/zh/transform-v2/sql-functions.md
index 188d12f665..c5fe31e5d9 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -954,7 +954,7 @@ YEAR(CREATED)
最重要的格式字符包括:y(年)、M(月)、d(日)、H(时)、m(分)、s(秒)。有关格式的详细信息,请参阅
`java.time.format.DateTimeFormatter`。
-`timeZone` 是可选的,默认值为系统的时区。`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`UTC+8`
表示亚洲/上海时区,请参阅 `java.time.ZoneId`。
+`timeZone` 是可选的,默认值为系统的时区。`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`UTC+8`
表示亚洲/上海时区,请参阅 https://en.wikipedia.org/wiki/List_of_tz_database_time_zones 。
示例:
@@ -968,6 +968,21 @@ or
CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6')
+
+### AT TIME ZONE
+
+```dateAndTime AT TIME ZONE 'timeZone' -> TIMESTAMP_TZ```
+
+转换一个时间戳值为指定时区的带时区时间戳值。
+
+`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`+08:00` 表示亚洲/上海时区,请参阅
https://en.wikipedia.org/wiki/List_of_tz_database_time_zones 。
+
+Example:
+
+local_date_time AT TIME ZONE '+09:00'
+
+offset_date_time AT TIME ZONE 'Pacific/Honolulu'
+
## System Functions
### CAST
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 42bcf2d894..d5c3aa95ac 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -44,6 +44,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -275,6 +276,10 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
LocalDateTime localDateTime = (LocalDateTime) value;
statement.setTimestamp(statementIndex,
Timestamp.valueOf(localDateTime));
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime) value;
+ statement.setTimestamp(statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) value);
break;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
index 33c689fd39..1a7b3dbd72 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
@@ -35,9 +35,11 @@ import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
+import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
public class InceptorJdbcRowConverter extends HiveJdbcRowConverter {
@@ -103,6 +105,11 @@ public class InceptorJdbcRowConverter extends
HiveJdbcRowConverter {
statement.setTimestamp(
statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime)
row.getField(fieldIndex);
+ statement.setTimestamp(
+ statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
break;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index f766ce3980..db5d23aae9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -39,6 +39,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.Optional;
public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -174,8 +175,12 @@ public class KingbaseJdbcRowConverter extends
AbstractJdbcRowConverter {
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime)
row.getField(fieldIndex);
+ statement.setTimestamp(statementIndex,
Timestamp.valueOf(localDateTime));
+ break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime)
row.getField(fieldIndex);
statement.setTimestamp(
- statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
+ statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index bf49d4d0b7..0c7fba387b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -46,6 +46,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.Optional;
public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -220,6 +221,11 @@ public class OceanBaseMysqlJdbcRowConverter extends
AbstractJdbcRowConverter {
statement.setTimestamp(
statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime)
row.getField(fieldIndex);
+ statement.setTimestamp(
+ statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
break;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 6866e428e4..be7c8efd18 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -49,6 +49,7 @@ import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.Locale;
import java.util.Optional;
@@ -253,6 +254,11 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
statement.setTimestamp(
statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime)
row.getField(fieldIndex);
+ statement.setTimestamp(
+ statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
break;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index ca0d2cddfd..b68011e689 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
@@ -38,10 +39,8 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -565,22 +564,15 @@ public class OracleCDCIT extends AbstractOracleCDCIT
implements TestResource {
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection(ORACLE_CONTAINER);
- Statement statement = connection.createStatement()) {
- ResultSet resultSet = statement.executeQuery(sql);
- List<List<Object>> result = new ArrayList<>();
- int columnCount = resultSet.getMetaData().getColumnCount();
- while (resultSet.next()) {
- ArrayList<Object> objects = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- objects.add(resultSet.getObject(i));
- }
- result.add(objects);
- }
- return result;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ return JdbcUtil.querySql(
+ sql,
+ () -> {
+ try {
+ return getJdbcConnection(ORACLE_CONTAINER);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private void executeSql(String sql) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 259d9d4e6c..11083fcf89 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.awaitility.Awaitility;
@@ -55,10 +56,8 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -452,22 +451,15 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql)) {
- List<List<Object>> result = new ArrayList<>();
- int columnCount = resultSet.getMetaData().getColumnCount();
- while (resultSet.next()) {
- ArrayList<Object> objects = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- objects.add(resultSet.getObject(i));
- }
- result.add(objects);
- }
- return result;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ return JdbcUtil.querySql(
+ sql,
+ () -> {
+ try {
+ return this.getJdbcConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private void executeSql(String sql) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
index 34118fab90..389c50345c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcAutoGenerateSQLIT.java
@@ -41,9 +41,11 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import static org.apache.seatunnel.e2e.common.util.JdbcUtil.querySql;
import static org.awaitility.Awaitility.given;
@Slf4j
@@ -90,6 +92,24 @@ public class JdbcAutoGenerateSQLIT extends TestSuiteBase
implements TestResource
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/jdbc_sink_auto_generate_sql.conf");
Assertions.assertEquals(0, execResult.getExitCode());
+ List<Object> result =
+ querySql(
+ "select * from sink limit 1",
+ () -> {
+ try {
+ return DriverManager.getConnection(
+
postgreSQLContainer.getJdbcUrl(),
+
postgreSQLContainer.getUsername(),
+
postgreSQLContainer.getPassword());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .get(0);
+ Assertions.assertInstanceOf(Long.class, result.get(0));
+ Assertions.assertInstanceOf(String.class, result.get(1));
+ Assertions.assertInstanceOf(Integer.class, result.get(2));
+ Assertions.assertInstanceOf(java.sql.Timestamp.class, result.get(3));
}
@TestTemplate
@@ -111,7 +131,8 @@ public class JdbcAutoGenerateSQLIT extends TestSuiteBase
implements TestResource
"create table sink(\n"
+ "user_id BIGINT NOT NULL PRIMARY KEY,\n"
+ "name varchar(255),\n"
- + "age INT\n"
+ + "age INT,\n"
+ + "timestamp_tz TIMESTAMPTZ \n"
+ ")";
statement.execute(sink);
} catch (SQLException e) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
index f59972ee26..c8aaf22c0a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -41,10 +42,8 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -358,22 +357,15 @@ public class JdbcPostgresIdentifierIT extends
TestSuiteBase implements TestResou
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection();
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(sql)) {
- List<List<Object>> result = new ArrayList<>();
- int columnCount = resultSet.getMetaData().getColumnCount();
- while (resultSet.next()) {
- ArrayList<Object> objects = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- objects.add(resultSet.getObject(i));
- }
- result.add(objects);
- }
- return result;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ return JdbcUtil.querySql(
+ sql,
+ () -> {
+ try {
+ return this.getJdbcConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private void executeSQL(String sql) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
index f185887a69..d807040312 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_auto_generate_sql.conf
@@ -28,6 +28,7 @@ source {
user_id = bigint
name = string
age = int
+ timestamp_tz = timestamp_tz
}
}
plugin_output = "fake"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 916f3be8ef..123de8bf9c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
@@ -53,10 +54,8 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -500,21 +499,15 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
}
private List<List<Object>> querySql(String sql) {
- try (Connection connection = getJdbcConnection();
- ResultSet resultSet =
connection.createStatement().executeQuery(sql)) {
- List<List<Object>> result = new ArrayList<>();
- int columnCount = resultSet.getMetaData().getColumnCount();
- while (resultSet.next()) {
- ArrayList<Object> objects = new ArrayList<>();
- for (int i = 1; i <= columnCount; i++) {
- objects.add(resultSet.getObject(i));
- }
- result.add(objects);
- }
- return result;
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ return JdbcUtil.querySql(
+ sql,
+ () -> {
+ try {
+ return this.getJdbcConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private void executeSQL(String sql) {
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
new file mode 100644
index 0000000000..a191305dab
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JdbcUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common.util;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class JdbcUtil {
+
+ public static List<List<Object>> querySql(String sql, Supplier<Connection>
connectionSupplier) {
+ try (Connection connection = connectionSupplier.get();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery(sql);
+ List<List<Object>> result = new ArrayList<>();
+ int columnCount = resultSet.getMetaData().getColumnCount();
+ while (resultSet.next()) {
+ ArrayList<Object> objects = new ArrayList<>();
+ for (int i = 1; i <= columnCount; i++) {
+ objects.add(resultSet.getObject(i));
+ }
+ result.add(objects);
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 86eaa9ccad..32f723f603 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -52,6 +52,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.SignedExpression;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TimezoneExpression;
import net.sf.jsqlparser.expression.TrimFunction;
import net.sf.jsqlparser.expression.WhenClause;
import net.sf.jsqlparser.expression.operators.arithmetic.Addition;
@@ -70,6 +71,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
+import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -393,6 +395,15 @@ public class ZetaSQLFunction {
}
return executeCastExpr(castExpression, leftValue);
}
+ if (expression instanceof TimezoneExpression) {
+ TimezoneExpression timezoneExpression = (TimezoneExpression)
expression;
+ Expression leftExpr = timezoneExpression.getLeftExpression();
+ Object leftValue = computeForValue(leftExpr, inputFields);
+ Object timeZoneId =
+ computeForValue(
+
timezoneExpression.getTimezoneExpressions().get(0), inputFields);
+ return DateTimeFunction.atTimeZone((TemporalAccessor) leftValue,
timeZoneId);
+ }
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
expression.toString()));
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 83c9550bed..5300e62e08 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -46,6 +46,7 @@ import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.SignedExpression;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.TimeKeyExpression;
+import net.sf.jsqlparser.expression.TimezoneExpression;
import net.sf.jsqlparser.expression.TrimFunction;
import net.sf.jsqlparser.expression.WhenClause;
import net.sf.jsqlparser.expression.operators.arithmetic.Concat;
@@ -240,6 +241,9 @@ public class ZetaSQLType {
return BasicType.DOUBLE_TYPE;
}
}
+ if (expression instanceof TimezoneExpression) {
+ return LocalTimeType.OFFSET_DATE_TIME_TYPE;
+ }
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
String.format("Unsupported SQL Expression: %s ",
expression.toString()));
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
index 2758f232cb..e677a37009 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/DateTimeFunction.java
@@ -716,4 +716,30 @@ public class DateTimeFunction {
LocalDateTime datetime =
Instant.ofEpochSecond(unixTime).atZone(zoneId).toLocalDateTime();
return df.format(datetime);
}
+
+ public static OffsetDateTime atTimeZone(TemporalAccessor datetime, Object
timeZone) {
+ if (datetime == null) {
+ return null;
+ }
+ if (timeZone == null) {
+ throw new TransformException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "The timeZone argument of function: AT TIME ZONE can not
be null");
+ }
+ ZoneId zoneId = ZoneId.of(timeZone.toString());
+ if (datetime instanceof LocalDateTime) {
+ return ((LocalDateTime) datetime)
+ .atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(zoneId)
+ .toOffsetDateTime();
+ } else if (datetime instanceof OffsetDateTime) {
+ Instant instant = ((OffsetDateTime) datetime).toInstant();
+ return instant.atZone(zoneId).toOffsetDateTime();
+ } else {
+ throw new TransformException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format(
+ "Unsupported type %s for function: AT TIME ZONE",
datetime.getClass()));
+ }
+ }
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
index c10856a47f..d30a4b0877 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/DateTimeFunctionTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.transform.sql.zeta;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -27,6 +28,10 @@ import org.apache.seatunnel.transform.sql.SQLEngineFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+
public class DateTimeFunctionTest {
@Test
@@ -63,6 +68,70 @@ public class DateTimeFunctionTest {
Assertions.assertEquals("2023-01-01 10:00:00", field1.toString());
}
+ @Test
+ public void testAtTimeZoneFunction() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"local_date_time", "offset_date_time"},
+ new SeaTunnelDataType[] {
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.OFFSET_DATE_TIME_TYPE
+ });
+
+ LocalDateTime now = LocalDateTime.now();
+ SeaTunnelRow inputRow =
+ new SeaTunnelRow(
+ new Object[] {now,
now.atZone(ZoneId.systemDefault()).toOffsetDateTime()});
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select local_date_time AT TIME ZONE '+09:00' as
date_time_with_zone,"
+ + "offset_date_time AT TIME ZONE '-05:00' as
offset_date_time_with_zone"
+ + " from dual");
+ SeaTunnelRowType seaTunnelRowType = sqlEngine.typeMapping(new
ArrayList<>());
+ Assertions.assertEquals(
+ LocalTimeType.OFFSET_DATE_TIME_TYPE,
seaTunnelRowType.getFieldType(0));
+
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
rowType).get(0);
+ Assertions.assertEquals(
+ now.atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("+09:00"))
+ .toOffsetDateTime(),
+ outRow.getField(0));
+ Assertions.assertEquals(
+ now.atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("-05:00"))
+ .toOffsetDateTime(),
+ outRow.getField(1));
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select local_date_time AT TIME ZONE 'Asia/Tokyo' as
date_time_with_zone,"
+ + "offset_date_time AT TIME ZONE 'Pacific/Honolulu' as
offset_date_time_with_zone"
+ + " from dual");
+ seaTunnelRowType = sqlEngine.typeMapping(new ArrayList<>());
+ Assertions.assertEquals(
+ LocalTimeType.OFFSET_DATE_TIME_TYPE,
seaTunnelRowType.getFieldType(0));
+ Assertions.assertEquals(
+ LocalTimeType.OFFSET_DATE_TIME_TYPE,
seaTunnelRowType.getFieldType(1));
+
+ outRow = sqlEngine.transformBySQL(inputRow, rowType).get(0);
+ Assertions.assertEquals(
+ now.atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("+09:00"))
+ .toOffsetDateTime(),
+ outRow.getField(0));
+ Assertions.assertEquals(
+ now.atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("-10:00"))
+ .toOffsetDateTime(),
+ outRow.getField(1));
+ }
+
@Test
public void testFromUnixtimeFunctionWithIntegerInput() {
SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);