[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pietro updated FLINK-35053: --------------------------- Summary: TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres (was: TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres) > TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres > --------------------------------------------------------------------------- > > Key: FLINK-35053 > URL: https://issues.apache.org/jira/browse/FLINK-35053 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.19.0, 1.18.1, jdbc-3.1.2 > Reporter: Pietro > Priority: Major > Attachments: Timestamp.png, TimestampData.png, > createExternalConverter.png > > > The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, > nor {{TIMESTAMP_LTZ}} types. > Related issues: FLINK-22199, FLINK-20869 > h2. Problem Explanation > A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . > {code:sql} > -- Postgres DDL > CREATE TABLE target_table ( > tm_tz TIMESTAMP WITH TIME ZONE > ) > {code} > In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and > our goal is to sink it to {{{}target_table{}}}. > {code:sql} > -- Flink DDL > CREATE TABLE sink ( > tm_tz TIMESTAMP_LTZ(6) > ) WITH ( > 'connector' = 'jdbc', > 'table-name' = 'target_table' > ... > ) > {code} > According to > [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while > {{TIMESTAMP_WITH_TIME_ZONE}} is not. > However, when the converter is created via > [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], > it throws an {{UnsupportedOperationException}} since > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while > [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] > is. > {code:java} > Exception in thread "main" java.lang.UnsupportedOperationException: > Unsupported type:TIMESTAMP_LTZ(6) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) > at > org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) > at > org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) > at > org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) > at > org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708) > {code} > h3. Using TIMESTAMP WITH TIME ZONE > Defining {{tm_tz}} in Flink as {{TIMESTAMP(6) WITH TIME ZONE}} instead of > {{TIMESTAMP_LTZ(6)}} does not solve the issue, and returns the following > error instead: > {code:java} > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL > parse failed. Encountered "TIME" at line 1, column 66. > Was expecting: > "LOCAL" ... > > at > org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) > {code} > h3. Using TIMESTAMP > Defining {{tm_tz}} in Flink as {{TIMESTAMP(6)}} can lead to potentially > incorrect time zone conversions. > For instance, assume that the local time is GMT+2 and we have a row in Flink > with {{tm_tz}} equal to {{'2024-04-01 00:00:00'}} (UTC). When the > {{toTimestamp()}} method > ([reference|https://github.com/apache/flink-connector-jdbc/blob/ab5d6159141bdbe8aed78e24c9500a136efbfac0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L251]) > used by {{AbstractJdbcRowConverter.createExternalConverter()}} is invoked > !createExternalConverter.png|width=80%! > it adds the local timezone to it, instead of "+00": > !TimestampData.png|width=80%! > !Timestamp.png|width=80%! > Postgres will therefore receive {{'2024-04-01 00:00:00+02'}} (instead of +00) > and will convert it to {{'2024-03-31 22:00:00+00'.}} > h2. Possible Solutions > # Make the JDBC connector support {{TIMESTAMP_LTZ}} by adding a proper > converter to > [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246]. > # Fix the behavior of the > [converter|https://github.com/apache/flink-connector-jdbc/blob/ab5d6159141bdbe8aed78e24c9500a136efbfac0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L251] > for {{TIMESTAMP}} types, so that: > ## it either forces UTC timezone (like adding "+00" to timestamps) or > ## it removes timezone information from the timestamp passed to the external > system -- This message was sent by Atlassian Jira (v8.20.10#820010)