[ 
https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiangcheng updated FLINK-35907:
-------------------------------
    Description: 
I am encountering issues with the Flink CDC PostgreSQL Connector's handling of 
{{bit}} and {{bit( n)}} data types, which seem to deviate from expected 
behavior, leading to inconsistencies and errors during both snapshot and 
incremental sync phases.
h1. Problems
h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*

During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as 
{{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment 
of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of 
precision when the actual intention could be to preserve the bit pattern, 
rather than a simple true/false value.
h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*

For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the 
snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from 
attempting to read these values using {{{}PgResult#getBoolean{}}}, which is 
inappropriate for {{bit(n)}} types not representing a standard boolean state. 
Strangely, this error does not surface during the incremental phase, because no 
{{PgResult#getBoolean}} is invoked during the incremental phase.
h1. My Analysis
h2. BIT type is interpreted to BOOLEAN

Diving into the code reveals that for both scenarios, the connector relies on 
{{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as 
{{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic 
usage of {{getBoolean}} for non-standard boolean representations like 
{{{}bit(n){}}}.
h2.  
h2. Inconsistency between Snapshot and Incremental Phases

The discrepancy between the snapshot phase and incremental phase is noteworthy. 
During the snapshot phase, errors manifest due to direct interaction with 
{{PgResult#getObject}} in 
{{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and 
{{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.

Conversely, in the incremental phase, {{bit(n)}} values are coerced into 
{{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the 
original {{n}} precision information. This forces consumers to assume an 8-bit 
byte array representation, obscuring the intended bit-width and potentially 
leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into 
PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte 
array of length = 1 in SourceRecord, the first element is 127).
h1. *My Opinion*

>From my perspective, the following approaches may solve this problem.

*Consistent Handling:* The connector should uniformly and accurately handle all 
{{bit}} variants, respecting their distinct PostgreSQL definitions.

*Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is 
maintained throughout processing, allowing consumers to correctly interpret the 
intended bit sequence without assuming a default byte size.

*Schema Transparency:* Enhance metadata handling to communicate the original 
{{bit(n)}} schema accurately to downstream systems, enabling them to process 
the data with full knowledge of its intended format.
h1. Conclusion

Addressing these discrepancies will not only improve the reliability of the 
Flink CDC PostgreSQL Connector but also enhance its compatibility with a 
broader range of use cases that rely on precise {{bit}} data handling. I look 
forward to a resolution that ensures consistent and accurate processing across 
both snapshot and incremental modes.

Thank you for considering this issue, and I'm available to provide further 
details or assist in any way possible.
h1. Reproduct the Error

I use the following code to read record from PostgreSQL Connector by 
implementing the deserialize method in 
{{{}DebeziumDeserializationSchema<Record>{}}}:
{code:java}
public class PostgreSqlRecordSourceDeserializeSchema
        implements DebeziumDeserializationSchema<Record> {
        
    public void deserialize(SourceRecord sourceRecord, Collector<Record> out) 
throws Exception {  
        // skipping irrelevant business logic ...      
        Struct rowValue = ((Struct) 
sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
        
        for (Field field: rowValue.schema().fields()){
            switch (field.schema().type()) {
                case BOOLEAN:
                    // handling bit/boolean type
                    Boolean value = rowValue.getBoolean(field.name());
                    break;
                case BYTES:
                    if (StringUtils.equals(field.schema().name(), 
Bits.class.getName())) {
                           // handling bit(n) type, where n > 1
                           String byteString = "";
                             for (byte b : rowValue.getBytes(field.name())) {
                                 String tmpByteString = String.format("%8s", 
Integer.toBinaryString(b)).replace(' ', '0');
                            if (tmpByteString.length() == 32) {
                                tmpByteString = tmpByteString.substring(24);
                            }
                            byteString = tmpByteString + byteString;
                             }
                    } else // skipping irrelevant business logic ...     
                    break;
                case // skipping irrelevant business logic ...     
        }
        // skipping irrelevant business logic ...     
        
    }
}  {code}
h1. Version

I use the following maven settings.
{code:java}
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs 
regardless of PG Version.

JDK: Java 8.
h1. Offer for Assistance

I am willing to provide additional test scenarios or results to help diagnose 
this issue further. Moreover, I am open to collaborating on reviewing potential 
fixes or providing any necessary feedback to ensure a comprehensive resolution 
to this discrepancy. 

Thank you for your attention to this issue, and I look forward to working 
together towards enhancing the reliability and accuracy of the Flink CDC 
PostgreSQL Connector.

Best regards,

AldrichZeng(曾曜)

  was:
I am encountering issues with the Flink CDC PostgreSQL Connector's handling of 
{{bit}} and {{bit(n)}} data types, which seem to deviate from expected 
behavior, leading to inconsistencies and errors during both snapshot and 
incremental sync phases.
h1. Problems
h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*

During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as 
{{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment 
of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of 
precision when the actual intention could be to preserve the bit pattern, 
rather than a simple true/false value.
h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*

For {{bit(n) }}where {{{}n > 1{}}}, an error is encountered during the snapshot 
phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting to 
read these values using {{{}PgResult#getBoolean{}}}, which is inappropriate for 
{{bit(n)}} types not representing a standard boolean state. Strangely, this 
error does not surface during the incremental phase, because no 
{{PgResult#getBoolean}} is invoked during the incremental phase.
h1. My Analysis
h2. BIT type is interpreted to BOOLEAN

Diving into the code reveals that for both scenarios, the connector relies on 
{{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as 
{{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic 
usage of {{getBoolean}} for non-standard boolean representations like 
{{{}bit(n){}}}.
h2.  
h2. Inconsistency between Snapshot and Incremental Phases

The discrepancy between the snapshot phase and incremental phase is noteworthy. 
During the snapshot phase, errors manifest due to direct interaction with 
{{PgResult#getObject}} in 
{{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and 
{{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.

Conversely, in the incremental phase, {{bit(n)}} values are coerced into 
{{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the 
original {{n}} precision information. This forces consumers to assume an 8-bit 
byte array representation, obscuring the intended bit-width and potentially 
leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into 
PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte 
array of length = 1 in SourceRecord, the first element is 127).
h1. *My Opinion*

>From my perspective, the following approaches may solve this problem.

*Consistent Handling:* The connector should uniformly and accurately handle all 
{{bit}} variants, respecting their distinct PostgreSQL definitions.

*Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is 
maintained throughout processing, allowing consumers to correctly interpret the 
intended bit sequence without assuming a default byte size.

*Schema Transparency:* Enhance metadata handling to communicate the original 
{{bit(n)}} schema accurately to downstream systems, enabling them to process 
the data with full knowledge of its intended format.
h1. Conclusion

Addressing these discrepancies will not only improve the reliability of the 
Flink CDC PostgreSQL Connector but also enhance its compatibility with a 
broader range of use cases that rely on precise {{bit}} data handling. I look 
forward to a resolution that ensures consistent and accurate processing across 
both snapshot and incremental modes.

Thank you for considering this issue, and I'm available to provide further 
details or assist in any way possible.
h1. Reproduct the Error

I use the following code to read record from PostgreSQL Connector by 
implementing the deserialize method in 
{{{}DebeziumDeserializationSchema<Record>{}}}:
{code:java}
public class PostgreSqlRecordSourceDeserializeSchema
        implements DebeziumDeserializationSchema<Record> {
        
    public void deserialize(SourceRecord sourceRecord, Collector<Record> out) 
throws Exception {  
        // skipping irrelevant business logic ...      
        Struct rowValue = ((Struct) 
sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
        
        for (Field field: rowValue.schema().fields()){
            switch (field.schema().type()) {
                case BOOLEAN:
                    // handling bit/boolean type
                    Boolean value = rowValue.getBoolean(field.name());
                    break;
                case BYTES:
                    if (StringUtils.equals(field.schema().name(), 
Bits.class.getName())) {
                           // handling bit(n) type, where n > 1
                           String byteString = "";
                             for (byte b : rowValue.getBytes(field.name())) {
                                 String tmpByteString = String.format("%8s", 
Integer.toBinaryString(b)).replace(' ', '0');
                            if (tmpByteString.length() == 32) {
                                tmpByteString = tmpByteString.substring(24);
                            }
                            byteString = tmpByteString + byteString;
                             }
                    } else // skipping irrelevant business logic ...     
                    break;
                case // skipping irrelevant business logic ...     
        }
        // skipping irrelevant business logic ...     
        
    }
}  {code}
h1. Version

I use the following maven settings.
{code:java}
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-postgres-cdc</artifactId>
    <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs 
regardless of PG Version.

JDK: Java 8.
h1. Offer for Assistance

I am willing to provide additional test scenarios or results to help diagnose 
this issue further. Moreover, I am open to collaborating on reviewing potential 
fixes or providing any necessary feedback to ensure a comprehensive resolution 
to this discrepancy. 

Thank you for your attention to this issue, and I look forward to working 
together towards enhancing the reliability and accuracy of the Flink CDC 
PostgreSQL Connector.

Best regards,

AldrichZeng(曾曜)


> Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL 
> Connector
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-35907
>                 URL: https://issues.apache.org/jira/browse/FLINK-35907
>             Project: Flink
>          Issue Type: Bug
>         Environment: I use the following maven settings.
> {code:java}
> <dependency>
>     <groupId>com.ververica</groupId>
>     <artifactId>flink-connector-postgres-cdc</artifactId>
>     <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error 
> occurs regardless of PG Version.
> JDK: Java 8.
>            Reporter: jiangcheng
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I am encountering issues with the Flink CDC PostgreSQL Connector's handling 
> of {{bit}} and {{bit( n)}} data types, which seem to deviate from expected 
> behavior, leading to inconsistencies and errors during both snapshot and 
> incremental sync phases.
> h1. Problems
> h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
> During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as 
> {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment 
> of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss 
> of precision when the actual intention could be to preserve the bit pattern, 
> rather than a simple true/false value.
> h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*
> For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the 
> snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from 
> attempting to read these values using {{{}PgResult#getBoolean{}}}, which is 
> inappropriate for {{bit(n)}} types not representing a standard boolean state. 
> Strangely, this error does not surface during the incremental phase, because 
> no {{PgResult#getBoolean}} is invoked during the incremental phase.
> h1. My Analysis
> h2. BIT type is interpreted to BOOLEAN
> Diving into the code reveals that for both scenarios, the connector relies on 
> {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as 
> {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the 
> problematic usage of {{getBoolean}} for non-standard boolean representations 
> like {{{}bit(n){}}}.
> h2.  
> h2. Inconsistency between Snapshot and Incremental Phases
> The discrepancy between the snapshot phase and incremental phase is 
> noteworthy. During the snapshot phase, errors manifest due to direct 
> interaction with {{PgResult#getObject}} in 
> {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and 
> {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
> Conversely, in the incremental phase, {{bit(n)}} values are coerced into 
> {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of 
> the original {{n}} precision information. This forces consumers to assume an 
> 8-bit byte array representation, obscuring the intended bit-width and 
> potentially leading to incorrect interpretations (e.g., I insert a 
> {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is 
> represented as a byte array of length = 1 in SourceRecord, the first element 
> is 127).
> h1. *My Opinion*
> From my perspective, the following approaches may solve this problem.
> *Consistent Handling:* The connector should uniformly and accurately handle 
> all {{bit}} variants, respecting their distinct PostgreSQL definitions.
> *Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is 
> maintained throughout processing, allowing consumers to correctly interpret 
> the intended bit sequence without assuming a default byte size.
> *Schema Transparency:* Enhance metadata handling to communicate the original 
> {{bit(n)}} schema accurately to downstream systems, enabling them to process 
> the data with full knowledge of its intended format.
> h1. Conclusion
> Addressing these discrepancies will not only improve the reliability of the 
> Flink CDC PostgreSQL Connector but also enhance its compatibility with a 
> broader range of use cases that rely on precise {{bit}} data handling. I look 
> forward to a resolution that ensures consistent and accurate processing 
> across both snapshot and incremental modes.
> Thank you for considering this issue, and I'm available to provide further 
> details or assist in any way possible.
> h1. Reproduct the Error
> I use the following code to read record from PostgreSQL Connector by 
> implementing the deserialize method in 
> {{{}DebeziumDeserializationSchema<Record>{}}}:
> {code:java}
> public class PostgreSqlRecordSourceDeserializeSchema
>         implements DebeziumDeserializationSchema<Record> {
>         
>     public void deserialize(SourceRecord sourceRecord, Collector<Record> out) 
> throws Exception {  
>         // skipping irrelevant business logic ...      
>         Struct rowValue = ((Struct) 
> sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
>         
>         for (Field field: rowValue.schema().fields()){
>             switch (field.schema().type()) {
>                 case BOOLEAN:
>                     // handling bit/boolean type
>                     Boolean value = rowValue.getBoolean(field.name());
>                     break;
>                 case BYTES:
>                     if (StringUtils.equals(field.schema().name(), 
> Bits.class.getName())) {
>                            // handling bit(n) type, where n > 1
>                            String byteString = "";
>                              for (byte b : rowValue.getBytes(field.name())) {
>                                  String tmpByteString = String.format("%8s", 
> Integer.toBinaryString(b)).replace(' ', '0');
>                             if (tmpByteString.length() == 32) {
>                                 tmpByteString = tmpByteString.substring(24);
>                             }
>                             byteString = tmpByteString + byteString;
>                              }
>                     } else // skipping irrelevant business logic ...     
>                     break;
>                 case // skipping irrelevant business logic ...     
>         }
>         // skipping irrelevant business logic ...     
>         
>     }
> }  {code}
> h1. Version
> I use the following maven settings.
> {code:java}
> <dependency>
>     <groupId>com.ververica</groupId>
>     <artifactId>flink-connector-postgres-cdc</artifactId>
>     <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error 
> occurs regardless of PG Version.
> JDK: Java 8.
> h1. Offer for Assistance
> I am willing to provide additional test scenarios or results to help diagnose 
> this issue further. Moreover, I am open to collaborating on reviewing 
> potential fixes or providing any necessary feedback to ensure a comprehensive 
> resolution to this discrepancy. 
> Thank you for your attention to this issue, and I look forward to working 
> together towards enhancing the reliability and accuracy of the Flink CDC 
> PostgreSQL Connector.
> Best regards,
> AldrichZeng(曾曜)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to