[ https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jiangcheng updated FLINK-35907: ------------------------------- Description: I am encountering issues with the Flink CDC PostgreSQL Connector's handling of {{bit}} and {{bit( n)}} data types, which seem to deviate from expected behavior, leading to inconsistencies and errors during both snapshot and incremental sync phases. h1. Problems h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}* During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of precision when the actual intention could be to preserve the bit pattern, rather than a simple true/false value. h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase* For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting to read these values using {{{}PgResult#getBoolean{}}}, which is inappropriate for {{bit(n)}} types not representing a standard boolean state. Strangely, this error does not surface during the incremental phase, because no {{PgResult#getBoolean}} is invoked during the incremental phase. h1. My Analysis h2. BIT type is interpreted to BOOLEAN Diving into the code reveals that for both scenarios, the connector relies on {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic usage of {{getBoolean}} for non-standard boolean representations like {{{}bit(n){}}}. h2. h2. Inconsistency between Snapshot and Incremental Phases The discrepancy between the snapshot phase and incremental phase is noteworthy. During the snapshot phase, errors manifest due to direct interaction with {{PgResult#getObject}} in {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}. Conversely, in the incremental phase, {{bit(n)}} values are coerced into {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the original {{n}} precision information. This forces consumers to assume an 8-bit byte array representation, obscuring the intended bit-width and potentially leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte array of length = 1 in SourceRecord, the first element is 127). h1. *My Opinion* >From my perspective, the following approaches may solve this problem. *Consistent Handling:* The connector should uniformly and accurately handle all {{bit}} variants, respecting their distinct PostgreSQL definitions. *Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is maintained throughout processing, allowing consumers to correctly interpret the intended bit sequence without assuming a default byte size. *Schema Transparency:* Enhance metadata handling to communicate the original {{bit(n)}} schema accurately to downstream systems, enabling them to process the data with full knowledge of its intended format. h1. Conclusion Addressing these discrepancies will not only improve the reliability of the Flink CDC PostgreSQL Connector but also enhance its compatibility with a broader range of use cases that rely on precise {{bit}} data handling. I look forward to a resolution that ensures consistent and accurate processing across both snapshot and incremental modes. Thank you for considering this issue, and I'm available to provide further details or assist in any way possible. h1. Reproduct the Error I use the following code to read record from PostgreSQL Connector by implementing the deserialize method in {{{}DebeziumDeserializationSchema<Record>{}}}: {code:java} public class PostgreSqlRecordSourceDeserializeSchema implements DebeziumDeserializationSchema<Record> { public void deserialize(SourceRecord sourceRecord, Collector<Record> out) throws Exception { // skipping irrelevant business logic ... Struct rowValue = ((Struct) sourceRecord.value()).getStruct(Envelope.FieldName.AFTER); for (Field field: rowValue.schema().fields()){ switch (field.schema().type()) { case BOOLEAN: // handling bit/boolean type Boolean value = rowValue.getBoolean(field.name()); break; case BYTES: if (StringUtils.equals(field.schema().name(), Bits.class.getName())) { // handling bit(n) type, where n > 1 String byteString = ""; for (byte b : rowValue.getBytes(field.name())) { String tmpByteString = String.format("%8s", Integer.toBinaryString(b)).replace(' ', '0'); if (tmpByteString.length() == 32) { tmpByteString = tmpByteString.substring(24); } byteString = tmpByteString + byteString; } } else // skipping irrelevant business logic ... break; case // skipping irrelevant business logic ... } // skipping irrelevant business logic ... } } {code} h1. Version I use the following maven settings. {code:java} <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>2.4-vvr-8.0-metrics-SNAPSHOT</version> </dependency> {code} PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs regardless of PG Version. JDK: Java 8. h1. Offer for Assistance I am willing to provide additional test scenarios or results to help diagnose this issue further. Moreover, I am open to collaborating on reviewing potential fixes or providing any necessary feedback to ensure a comprehensive resolution to this discrepancy. Thank you for your attention to this issue, and I look forward to working together towards enhancing the reliability and accuracy of the Flink CDC PostgreSQL Connector. Best regards, AldrichZeng(曾曜) was: I am encountering issues with the Flink CDC PostgreSQL Connector's handling of {{bit}} and {{bit(n)}} data types, which seem to deviate from expected behavior, leading to inconsistencies and errors during both snapshot and incremental sync phases. h1. Problems h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}* During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of precision when the actual intention could be to preserve the bit pattern, rather than a simple true/false value. h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase* For {{bit(n) }}where {{{}n > 1{}}}, an error is encountered during the snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting to read these values using {{{}PgResult#getBoolean{}}}, which is inappropriate for {{bit(n)}} types not representing a standard boolean state. Strangely, this error does not surface during the incremental phase, because no {{PgResult#getBoolean}} is invoked during the incremental phase. h1. My Analysis h2. BIT type is interpreted to BOOLEAN Diving into the code reveals that for both scenarios, the connector relies on {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic usage of {{getBoolean}} for non-standard boolean representations like {{{}bit(n){}}}. h2. h2. Inconsistency between Snapshot and Incremental Phases The discrepancy between the snapshot phase and incremental phase is noteworthy. During the snapshot phase, errors manifest due to direct interaction with {{PgResult#getObject}} in {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}. Conversely, in the incremental phase, {{bit(n)}} values are coerced into {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the original {{n}} precision information. This forces consumers to assume an 8-bit byte array representation, obscuring the intended bit-width and potentially leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte array of length = 1 in SourceRecord, the first element is 127). h1. *My Opinion* >From my perspective, the following approaches may solve this problem. *Consistent Handling:* The connector should uniformly and accurately handle all {{bit}} variants, respecting their distinct PostgreSQL definitions. *Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is maintained throughout processing, allowing consumers to correctly interpret the intended bit sequence without assuming a default byte size. *Schema Transparency:* Enhance metadata handling to communicate the original {{bit(n)}} schema accurately to downstream systems, enabling them to process the data with full knowledge of its intended format. h1. Conclusion Addressing these discrepancies will not only improve the reliability of the Flink CDC PostgreSQL Connector but also enhance its compatibility with a broader range of use cases that rely on precise {{bit}} data handling. I look forward to a resolution that ensures consistent and accurate processing across both snapshot and incremental modes. Thank you for considering this issue, and I'm available to provide further details or assist in any way possible. h1. Reproduct the Error I use the following code to read record from PostgreSQL Connector by implementing the deserialize method in {{{}DebeziumDeserializationSchema<Record>{}}}: {code:java} public class PostgreSqlRecordSourceDeserializeSchema implements DebeziumDeserializationSchema<Record> { public void deserialize(SourceRecord sourceRecord, Collector<Record> out) throws Exception { // skipping irrelevant business logic ... Struct rowValue = ((Struct) sourceRecord.value()).getStruct(Envelope.FieldName.AFTER); for (Field field: rowValue.schema().fields()){ switch (field.schema().type()) { case BOOLEAN: // handling bit/boolean type Boolean value = rowValue.getBoolean(field.name()); break; case BYTES: if (StringUtils.equals(field.schema().name(), Bits.class.getName())) { // handling bit(n) type, where n > 1 String byteString = ""; for (byte b : rowValue.getBytes(field.name())) { String tmpByteString = String.format("%8s", Integer.toBinaryString(b)).replace(' ', '0'); if (tmpByteString.length() == 32) { tmpByteString = tmpByteString.substring(24); } byteString = tmpByteString + byteString; } } else // skipping irrelevant business logic ... break; case // skipping irrelevant business logic ... } // skipping irrelevant business logic ... } } {code} h1. Version I use the following maven settings. {code:java} <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>2.4-vvr-8.0-metrics-SNAPSHOT</version> </dependency> {code} PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs regardless of PG Version. JDK: Java 8. h1. Offer for Assistance I am willing to provide additional test scenarios or results to help diagnose this issue further. Moreover, I am open to collaborating on reviewing potential fixes or providing any necessary feedback to ensure a comprehensive resolution to this discrepancy. Thank you for your attention to this issue, and I look forward to working together towards enhancing the reliability and accuracy of the Flink CDC PostgreSQL Connector. Best regards, AldrichZeng(曾曜) > Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL > Connector > ------------------------------------------------------------------------------- > > Key: FLINK-35907 > URL: https://issues.apache.org/jira/browse/FLINK-35907 > Project: Flink > Issue Type: Bug > Environment: I use the following maven settings. > {code:java} > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-connector-postgres-cdc</artifactId> > <version>2.4-vvr-8.0-metrics-SNAPSHOT</version> > </dependency> {code} > PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error > occurs regardless of PG Version. > JDK: Java 8. > Reporter: jiangcheng > Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > I am encountering issues with the Flink CDC PostgreSQL Connector's handling > of {{bit}} and {{bit( n)}} data types, which seem to deviate from expected > behavior, leading to inconsistencies and errors during both snapshot and > incremental sync phases. > h1. Problems > h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}* > During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as > {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment > of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss > of precision when the actual intention could be to preserve the bit pattern, > rather than a simple true/false value. > h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase* > For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the > snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from > attempting to read these values using {{{}PgResult#getBoolean{}}}, which is > inappropriate for {{bit(n)}} types not representing a standard boolean state. > Strangely, this error does not surface during the incremental phase, because > no {{PgResult#getBoolean}} is invoked during the incremental phase. > h1. My Analysis > h2. BIT type is interpreted to BOOLEAN > Diving into the code reveals that for both scenarios, the connector relies on > {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as > {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the > problematic usage of {{getBoolean}} for non-standard boolean representations > like {{{}bit(n){}}}. > h2. > h2. Inconsistency between Snapshot and Incremental Phases > The discrepancy between the snapshot phase and incremental phase is > noteworthy. During the snapshot phase, errors manifest due to direct > interaction with {{PgResult#getObject}} in > {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and > {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}. > Conversely, in the incremental phase, {{bit(n)}} values are coerced into > {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of > the original {{n}} precision information. This forces consumers to assume an > 8-bit byte array representation, obscuring the intended bit-width and > potentially leading to incorrect interpretations (e.g., I insert a > {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is > represented as a byte array of length = 1 in SourceRecord, the first element > is 127). > h1. *My Opinion* > From my perspective, the following approaches may solve this problem. > *Consistent Handling:* The connector should uniformly and accurately handle > all {{bit}} variants, respecting their distinct PostgreSQL definitions. > *Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is > maintained throughout processing, allowing consumers to correctly interpret > the intended bit sequence without assuming a default byte size. > *Schema Transparency:* Enhance metadata handling to communicate the original > {{bit(n)}} schema accurately to downstream systems, enabling them to process > the data with full knowledge of its intended format. > h1. Conclusion > Addressing these discrepancies will not only improve the reliability of the > Flink CDC PostgreSQL Connector but also enhance its compatibility with a > broader range of use cases that rely on precise {{bit}} data handling. I look > forward to a resolution that ensures consistent and accurate processing > across both snapshot and incremental modes. > Thank you for considering this issue, and I'm available to provide further > details or assist in any way possible. > h1. Reproduct the Error > I use the following code to read record from PostgreSQL Connector by > implementing the deserialize method in > {{{}DebeziumDeserializationSchema<Record>{}}}: > {code:java} > public class PostgreSqlRecordSourceDeserializeSchema > implements DebeziumDeserializationSchema<Record> { > > public void deserialize(SourceRecord sourceRecord, Collector<Record> out) > throws Exception { > // skipping irrelevant business logic ... > Struct rowValue = ((Struct) > sourceRecord.value()).getStruct(Envelope.FieldName.AFTER); > > for (Field field: rowValue.schema().fields()){ > switch (field.schema().type()) { > case BOOLEAN: > // handling bit/boolean type > Boolean value = rowValue.getBoolean(field.name()); > break; > case BYTES: > if (StringUtils.equals(field.schema().name(), > Bits.class.getName())) { > // handling bit(n) type, where n > 1 > String byteString = ""; > for (byte b : rowValue.getBytes(field.name())) { > String tmpByteString = String.format("%8s", > Integer.toBinaryString(b)).replace(' ', '0'); > if (tmpByteString.length() == 32) { > tmpByteString = tmpByteString.substring(24); > } > byteString = tmpByteString + byteString; > } > } else // skipping irrelevant business logic ... > break; > case // skipping irrelevant business logic ... > } > // skipping irrelevant business logic ... > > } > } {code} > h1. Version > I use the following maven settings. > {code:java} > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-connector-postgres-cdc</artifactId> > <version>2.4-vvr-8.0-metrics-SNAPSHOT</version> > </dependency> {code} > PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error > occurs regardless of PG Version. > JDK: Java 8. > h1. Offer for Assistance > I am willing to provide additional test scenarios or results to help diagnose > this issue further. Moreover, I am open to collaborating on reviewing > potential fixes or providing any necessary feedback to ensure a comprehensive > resolution to this discrepancy. > Thank you for your attention to this issue, and I look forward to working > together towards enhancing the reliability and accuracy of the Flink CDC > PostgreSQL Connector. > Best regards, > AldrichZeng(曾曜) -- This message was sent by Atlassian Jira (v8.20.10#820010)