[ 
https://issues.apache.org/jira/browse/FLINK-35715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruan Hang resolved FLINK-35715.
-------------------------------
    Resolution: Fixed

fix via(3.2-SNAPSHOT):07446d1f9de23b6d5e7b50de1b00ba901ffd31db

> Mysql Source support schema cache to deserialize record
> -------------------------------------------------------
>
>                 Key: FLINK-35715
>                 URL: https://issues.apache.org/jira/browse/FLINK-35715
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.1.1
>            Reporter: Hongshun Wang
>            Assignee: Ruan Hang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: cdc-3.2.0
>
>
>  
> Current, DebeziumEventDeserializationSchema will deserialize each record with 
> schema inferred by this record.
>  
> {code:java}
> private RecordData extractDataRecord(Struct value, Schema valueSchema) throws 
> Exception {
>     DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
>     return (RecordData) getOrCreateConverter(dataType).convert(value, 
> valueSchema);
> }
>  {code}
> There are some issues:
>  # Inferring and creating a converter as soon as a record arrives will incur 
> additional costs.
>  # Inferring from a record might not reflect the real table schema 
> accurately. For instance, a timestamp type with precision 6 in MySQL might 
> have a value with 0 nanoseconds of the millisecond. When inferred, it will 
> appear to have a precision of 0.
> {code:java}
> protected DataType inferString(Object value, Schema schema) {
>     if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
>         int nano =
>                 Optional.ofNullable((String) value)
>                         .map(s -> ZonedTimestamp.FORMATTER.parse(s, 
> Instant::from))
>                         .map(Instant::getNano)
>                         .orElse(0);
>         int precision;
>         if (nano == 0) {
>             precision = 0;
>         } else if (nano % 1000 > 0) {
>             precision = 9;
>         } else if (nano % 1000_000 > 0) {
>             precision = 6;
>         } else if (nano % 1000_000_000 > 0) {
>             precision = 3;
>         } else {
>             precision = 0;
>         }
>         return DataTypes.TIMESTAMP_LTZ(precision);
>     }
>     return DataTypes.STRING();
> } {code}
> However, timestamps with different precisions will have different data 
> formats in BinaryRecordData. Placing data with a timestamp of 0 precision and 
> then parsing it with a precision of 6 will result in an exception being 
> thrown.
>  
> {code:java}
> //org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
> @Override
> public TimestampData getTimestamp(int pos, int precision) {
>     assertIndexIsValid(pos);
>     if (TimestampData.isCompact(precision)) {
>         return 
> TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
>     }
>     int fieldOffset = getFieldOffset(pos);
>     final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
>     return BinarySegmentUtils.readTimestampData(segments, offset, 
> offsetAndNanoOfMilli);
> } {code}
> Thus, I think we should cache the table schema in Source, and only update it 
> with SchemaChangeRecord. Thus, the schema of source 
> SourceRecordEventDeserializer is always same with database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to