[ 
https://issues.apache.org/jira/browse/FLINK-37300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

He Wang updated FLINK-37300:
----------------------------
    Description: 
Many data types in the database do not have a one-to-one correspondence with 
the data types in Flink. So to ensure that the data types on the source and 
sink sides match accurately, we need to add the column type information in the 
database to the Column struct.

 

We can introduce new structs like below to represent data types in data sources.

 

 
{code:java}
/** Describes the raw data type in data sources. */
public interface RawDataType extends Serializable {} {code}
 

 

 
{code:java}
/** Describes the raw data type in jdbc data sources. */
public class JdbcRawDataType implements RawDataType {

    private static final long serialVersionUID = 1L;

    private final int jdbcType;
    private final String typeName;
    private final int length;
    private final Integer scale;

    public JdbcRawDataType(int jdbcType, String typeName, int length, Integer 
scale) {
        this.jdbcType = jdbcType;
        this.typeName = typeName;
        this.length = length;
        this.scale = scale;
    }

    /** Returns the {@link java.sql.Types}. */
    public int getJdbcType() {
        return jdbcType;
    }

    /** Returns the type name. */
    public String getTypeName() {
        return typeName;
    }

    /** Returns the maximum length. For numeric data types, this represents the 
precision. */
    public int getLength() {
        return length;
    }

    /** Returns the scale. */
    public Integer getScale() {
        return scale;
    }
} {code}
 

 

Then I thought we could add RawDataType as a new field of 
org.apache.flink.cdc.common.types.DataType and set that field when 
instantiating SchemaChangeEvent so that we can get the type in the passed 
schema.

  was:Many data types in the database do not have a one-to-one correspondence 
with the data types in Flink. So to ensure that the data types on the source 
and sink sides match accurately, we need to add the column type information in 
the database to the Column struct.


> Add database native type information to Column struct to generate ddl 
> correctly
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-37300
>                 URL: https://issues.apache.org/jira/browse/FLINK-37300
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>            Reporter: He Wang
>            Assignee: He Wang
>            Priority: Major
>
> Many data types in the database do not have a one-to-one correspondence with 
> the data types in Flink. So to ensure that the data types on the source and 
> sink sides match accurately, we need to add the column type information in 
> the database to the Column struct.
>  
> We can introduce new structs like below to represent data types in data 
> sources.
>  
>  
> {code:java}
> /** Describes the raw data type in data sources. */
> public interface RawDataType extends Serializable {} {code}
>  
>  
>  
> {code:java}
> /** Describes the raw data type in jdbc data sources. */
> public class JdbcRawDataType implements RawDataType {
>     private static final long serialVersionUID = 1L;
>     private final int jdbcType;
>     private final String typeName;
>     private final int length;
>     private final Integer scale;
>     public JdbcRawDataType(int jdbcType, String typeName, int length, Integer 
> scale) {
>         this.jdbcType = jdbcType;
>         this.typeName = typeName;
>         this.length = length;
>         this.scale = scale;
>     }
>     /** Returns the {@link java.sql.Types}. */
>     public int getJdbcType() {
>         return jdbcType;
>     }
>     /** Returns the type name. */
>     public String getTypeName() {
>         return typeName;
>     }
>     /** Returns the maximum length. For numeric data types, this represents 
> the precision. */
>     public int getLength() {
>         return length;
>     }
>     /** Returns the scale. */
>     public Integer getScale() {
>         return scale;
>     }
> } {code}
>  
>  
> Then I thought we could add RawDataType as a new field of 
> org.apache.flink.cdc.common.types.DataType and set that field when 
> instantiating SchemaChangeEvent so that we can get the type in the passed 
> schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to