[ https://issues.apache.org/jira/browse/FLINK-37300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
He Wang updated FLINK-37300: ---------------------------- Description: Many data types in the database do not have a one-to-one correspondence with the data types in Flink. So to ensure that the data types on the source and sink sides match accurately, we need to add the column type information in the database to the Column struct. We can introduce new structs like below to represent data types in data sources. {code:java} /** Describes the raw data type in data sources. */ public interface RawDataType extends Serializable {} {code} {code:java} /** Describes the raw data type in jdbc data sources. */ public class JdbcRawDataType implements RawDataType { private static final long serialVersionUID = 1L; private final int jdbcType; private final String typeName; private final int length; private final Integer scale; public JdbcRawDataType(int jdbcType, String typeName, int length, Integer scale) { this.jdbcType = jdbcType; this.typeName = typeName; this.length = length; this.scale = scale; } /** Returns the {@link java.sql.Types}. */ public int getJdbcType() { return jdbcType; } /** Returns the type name. */ public String getTypeName() { return typeName; } /** Returns the maximum length. For numeric data types, this represents the precision. */ public int getLength() { return length; } /** Returns the scale. */ public Integer getScale() { return scale; } } {code} Then I thought we could add RawDataType as a new field of org.apache.flink.cdc.common.types.DataType and set that field when instantiating SchemaChangeEvent so that we can get the type in the passed schema. was:Many data types in the database do not have a one-to-one correspondence with the data types in Flink. So to ensure that the data types on the source and sink sides match accurately, we need to add the column type information in the database to the Column struct. > Add database native type information to Column struct to generate ddl > correctly > ------------------------------------------------------------------------------- > > Key: FLINK-37300 > URL: https://issues.apache.org/jira/browse/FLINK-37300 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Reporter: He Wang > Assignee: He Wang > Priority: Major > > Many data types in the database do not have a one-to-one correspondence with > the data types in Flink. So to ensure that the data types on the source and > sink sides match accurately, we need to add the column type information in > the database to the Column struct. > > We can introduce new structs like below to represent data types in data > sources. > > > {code:java} > /** Describes the raw data type in data sources. */ > public interface RawDataType extends Serializable {} {code} > > > > {code:java} > /** Describes the raw data type in jdbc data sources. */ > public class JdbcRawDataType implements RawDataType { > private static final long serialVersionUID = 1L; > private final int jdbcType; > private final String typeName; > private final int length; > private final Integer scale; > public JdbcRawDataType(int jdbcType, String typeName, int length, Integer > scale) { > this.jdbcType = jdbcType; > this.typeName = typeName; > this.length = length; > this.scale = scale; > } > /** Returns the {@link java.sql.Types}. */ > public int getJdbcType() { > return jdbcType; > } > /** Returns the type name. */ > public String getTypeName() { > return typeName; > } > /** Returns the maximum length. For numeric data types, this represents > the precision. */ > public int getLength() { > return length; > } > /** Returns the scale. */ > public Integer getScale() { > return scale; > } > } {code} > > > Then I thought we could add RawDataType as a new field of > org.apache.flink.cdc.common.types.DataType and set that field when > instantiating SchemaChangeEvent so that we can get the type in the passed > schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)