loserwang1024 commented on code in PR #4181:
URL: https://github.com/apache/flink-cdc/pull/4181#discussion_r2638485239
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java:
##########
@@ -25,14 +25,46 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Schema;
+import java.time.Instant;
+import java.util.Optional;
+
/** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */
@Internal
public class PostgresSchemaDataTypeInference extends
DebeziumSchemaDataTypeInference {
private static final long serialVersionUID = 1L;
+ protected DataType inferString(Object value, Schema schema) {
+ // PostgreSQL TIMESTAMPTZ is encoded as ZonedTimestamp in Debezium
+ // We need to return TIMESTAMP_TZ (ZonedTimestampType) instead of
TIMESTAMP_LTZ
+ if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
+ int nano =
+ Optional.ofNullable((String) value)
+ .map(s -> ZonedTimestamp.FORMATTER.parse(s,
Instant::from))
+ .map(Instant::getNano)
+ .orElse(0);
+
+ int precision;
+ if (nano == 0) {
+ precision = 0;
+ } else if (nano % 1000 > 0) {
+ precision = 9;
+ } else if (nano % 1000_000 > 0) {
+ precision = 6;
+ } else if (nano % 1000_000_000 > 0) {
+ precision = 3;
+ } else {
+ precision = 0;
+ }
+ // Return TIMESTAMP_TZ (ZonedTimestampType) for PostgreSQL
TIMESTAMPTZ
+ return DataTypes.TIMESTAMP_TZ(precision);
Review Comment:
I am afraid some sink doesn't support TIMESTAMP_TZ. Would you like to check
whether paimon sink and kakfa sink support it?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java:
##########
@@ -367,6 +372,35 @@ protected Object convertToTimestamp(Object dbzObj, Schema
schema) {
+ dbzObj.getClass().getName());
}
+ protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof String) {
+ String str = (String) dbzObj;
+ // ZonedTimestamp type is encoded in string type with timezone
offset
+ // Format: ISO-8601 with timezone offset (e.g.,
"2020-07-17T18:00:22+00:00")
+ // According to Debezium documentation, PostgreSQL TIMESTAMPTZ is
ALWAYS encoded as
+ // String
Review Comment:
Would you like to add the documentation here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]