[ https://issues.apache.org/jira/browse/FLINK-35715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruan Hang resolved FLINK-35715. ------------------------------- Resolution: Fixed fix via(3.2-SNAPSHOT):07446d1f9de23b6d5e7b50de1b00ba901ffd31db > Mysql Source support schema cache to deserialize record > ------------------------------------------------------- > > Key: FLINK-35715 > URL: https://issues.apache.org/jira/browse/FLINK-35715 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.1 > Reporter: Hongshun Wang > Assignee: Ruan Hang > Priority: Blocker > Labels: pull-request-available > Fix For: cdc-3.2.0 > > > > Current, DebeziumEventDeserializationSchema will deserialize each record with > schema inferred by this record. > > {code:java} > private RecordData extractDataRecord(Struct value, Schema valueSchema) throws > Exception { > DataType dataType = schemaDataTypeInference.infer(value, valueSchema); > return (RecordData) getOrCreateConverter(dataType).convert(value, > valueSchema); > } > {code} > There are some issues: > # Inferring and creating a converter as soon as a record arrives will incur > additional costs. > # Inferring from a record might not reflect the real table schema > accurately. For instance, a timestamp type with precision 6 in MySQL might > have a value with 0 nanoseconds of the millisecond. When inferred, it will > appear to have a precision of 0. > {code:java} > protected DataType inferString(Object value, Schema schema) { > if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { > int nano = > Optional.ofNullable((String) value) > .map(s -> ZonedTimestamp.FORMATTER.parse(s, > Instant::from)) > .map(Instant::getNano) > .orElse(0); > int precision; > if (nano == 0) { > precision = 0; > } else if (nano % 1000 > 0) { > precision = 9; > } else if (nano % 1000_000 > 0) { > precision = 6; > } else if (nano % 1000_000_000 > 0) { > precision = 3; > } else { > precision = 0; > } > return DataTypes.TIMESTAMP_LTZ(precision); > } > return DataTypes.STRING(); > } {code} > However, timestamps with different precisions will have different data > formats in BinaryRecordData. Placing data with a timestamp of 0 precision and > then parsing it with a precision of 6 will result in an exception being > thrown. > > {code:java} > //org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp > @Override > public TimestampData getTimestamp(int pos, int precision) { > assertIndexIsValid(pos); > if (TimestampData.isCompact(precision)) { > return > TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos))); > } > int fieldOffset = getFieldOffset(pos); > final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); > return BinarySegmentUtils.readTimestampData(segments, offset, > offsetAndNanoOfMilli); > } {code} > Thus, I think we should cache the table schema in Source, and only update it > with SchemaChangeRecord. Thus, the schema of source > SourceRecordEventDeserializer is always same with database. -- This message was sent by Atlassian Jira (v8.20.10#820010)