jiangcheng created FLINK-35906:
----------------------------------

             Summary: Inconsistent timestamp precision handling in Flink CDC 
PostgreSQL Connector
                 Key: FLINK-35906
                 URL: https://issues.apache.org/jira/browse/FLINK-35906
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
         Environment: I use the following maven settings.
{code:java}
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}


PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs 
regardless of PG Version.

JDK: Java 8.
            Reporter: jiangcheng


I have encountered an inconsistency issue with the Flink CDC PostgreSQL 
Connector when it comes to handling different time types, specifically time, 
timetz, timestamp, and timestamptz. The problem revolves around the precision 
of these time-related values during both snapshot and incremental phases.
h1. Issue Details

*time type*

During the snapshot phase, the precision is reduced to milliseconds (ms), 
whereas in the incremental phase, the correct microsecond (micros) precision is 
maintained.

*timetz type*

This is where the most discrepancy arises. In the snapshot phase, the precision 
drops to seconds (s), and the time is interpreted according to the system's 
timezone, leading to potential misinterpretation of the actual stored value. 
Conversely, in the incremental phase, the precision increases to milliseconds 
(ms), but the timezone is fixed at 0 (UTC), causing further discrepancies. An 
illustrative example involves inserting 10:13:02.264525+08 into PostgreSQL; 
during the snapshot, it is retrieved as 10:13:02Z, while incrementally it 
appears as 02:13:02.264525Z. Specifically, I use the following code to read the 
data.

*timestamp type*

Both in snapshot and incremental modes, the precision is consistently at the 
microsecond level, which aligns with expectations.

*timestamptz type*

Unlike expected, both phases yield a reduced precision to milliseconds (ms), 
deviating from the native microsecond precision supported by PostgreSQL for 
this type.

 
h1. Expected Behavior

The Flink CDC PostgreSQL Connector should maintain the native precision 
provided by PostgreSQL for all time-related data types across both snapshot and 
incremental phases, ensuring that time and timestamptz types are accurately 
represented down to microseconds, and timetz correctly handles timezone 
information alongside its precision.



 
h1. Reproduct the Error

I use the following code to read record from PostgreSQL Connector by 
implementing the deserialize method in 
{{{}DebeziumDeserializationSchema<Record>{}}}:



 

 
{code:java}
public class PostgreSqlRecordSourceDeserializeSchema
        implements DebeziumDeserializationSchema<Record> {
        
    public void deserialize(SourceRecord sourceRecord, Collector<Record> out) 
throws Exception {  
        // skipping irrelevant business logic ...      
        Struct rowValue = ((Struct) 
sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
        
        for (Field field: rowValue.schema().fields()){
            switch (field.schema().type()) {
                case INT64:
                    if (StringUtils.equals(field.schema().name(), 
MicroTime.class.getName())) {
                        // handling time type
                        Long value = rowValue.getInt64(field.name());
                    } else if (StringUtils.equals(field.schema().name(), 
MicroTimestamp.class.getName())) {
                        // handling timestamp type
                        Long value = rowValue.getInt64(field.name());
                    } else // skipping irrelevant business logic ...     
                    break;
                case STRING:
                    if (StringUtils.equals(field.schema().name(), 
ZonedTimestamp.class.getName())) {
                        // handling timestamptz type
                        String value = rowValue.getString(field.name());
                    } else if (StringUtils.equals(ZonedTime.class.getName(), 
field.schema().name())) {
                        // handling timetz type
                        String value = rowValue.getString(field.name());
                    } else // skipping irrelevant business logic ...     
                    break;
                case // skipping irrelevant business logic ...     
        }
        // skipping irrelevant business logic ...     
        
    }
} {code}
h1. Version

I use the following maven settings.
{code:java}
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}


PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs 
regardless of PG Version.

JDK: Java 8.


h1. Offer for Assistance

I am willing to provide additional test scenarios or results to help diagnose 
this issue further. Moreover, I am open to collaborating on reviewing potential 
fixes or providing any necessary feedback to ensure a comprehensive resolution 
to this discrepancy.

Could you please look into this matter, and if needed, guide me on how I can 
contribute more effectively towards resolving it?

Thank you for your attention to this issue, and I look forward to working 
together towards enhancing the reliability and accuracy of the Flink CDC 
PostgreSQL Connector.

Best regards,

AldrichZeng(曾曜)



 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to