[ https://issues.apache.org/jira/browse/FLINK-15379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17005998#comment-17005998 ]
Zhenghua Gao commented on FLINK-15379: -------------------------------------- Since FLINK-15168 modified logic of computing physical indices, the query in description would failed in validation phase: Exception in thread "main" org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'timestamp6' does not match with the physical type TIMESTAMP(3) of the 'timestamp9' field of the TableSource return type. The root cause is the JDBC table source should implement *getProducedDataType* and return proper types with precision. > JDBC connector return wrong value if defined dataType contains precision > ------------------------------------------------------------------------ > > Key: FLINK-15379 > URL: https://issues.apache.org/jira/browse/FLINK-15379 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: Leonard Xu > Priority: Major > Fix For: 1.10.0 > > > A mysql table like: > > {code:java} > // CREATE TABLE `currency` ( > `currency_id` bigint(20) NOT NULL, > `currency_name` varchar(200) DEFAULT NULL, > `rate` double DEFAULT NULL, > `currency_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, > `country` varchar(100) DEFAULT NULL, > `timestamp6` timestamp(6) NULL DEFAULT NULL, > `time6` time(6) DEFAULT NULL, > `gdp` decimal(10,4) DEFAULT NULL, > PRIMARY KEY (`currency_id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+ > | currency_id | currency_name | rate | currency_time | country | > timestamp6 | time6 | gdp | > +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+ > | 1 | US Dollar | 1020 | 2019-12-20 17:23:00 | America | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 2 | Euro | 114 | 2019-12-20 12:22:00 | Germany | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 3 | RMB | 16 | 2019-12-20 12:22:00 | China | > 2019-12-20 12:22:00.023456 | 12:22:00.023456 | 100.0112 | > | 4 | Yen | 1 | 2019-12-20 12:22:00 | Japan | > 2019-12-20 12:22:00.123456 | 12:22:00.123456 | 100.4112 | > +-------------+---------------+------+---------------------+---------+----------------------------+-----------------+----------+{code} > > If user defined a jdbc table as dimension table like: > > {code:java} > // > public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > {code} > > User will get wrong value in column `timestamp6`,`time6`,`gdp`: > {code:java} > // c.currency_id, c.currency_name, c.rate, c.currency_time, c.country, > c.timestamp6, c.time6, c.gdp > 1,US > Dollar,1020.0,2019-12-20T17:23,America,2019-12-20T12:22:00.023456,12:22,-0.0001 > 2,Euro,114.0,2019-12-20T12:22,Germany,2019-12-20T12:22:00.023456,12:22,-0.0001 > 4,Yen,1.0,2019-12-20T12:22,Japan,2019-12-20T12:22:00.123456,12:22,-0.0001{code} > > > {code:java} > public class JDBCSourceExample { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(env, envSettings); > String mysqlCurrencyDDL = "CREATE TABLE currency (\n" + > " currency_id BIGINT,\n" + > " currency_name STRING,\n" + > " rate DOUBLE,\n" + > " currency_time TIMESTAMP(3),\n" + > " country STRING,\n" + > " timestamp6 TIMESTAMP(6),\n" + > " time6 TIME(6),\n" + > " gdp DECIMAL(10, 4)\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" + > " 'connector.username' = 'root'," + > " 'connector.table' = 'currency',\n" + > " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" + > " 'connector.lookup.cache.max-rows' = '500', \n" + > " 'connector.lookup.cache.ttl' = '10s',\n" + > " 'connector.lookup.max-retries' = '3'" + > ")"; > tableEnvironment.sqlUpdate(mysqlCurrencyDDL); > String querySQL = "select * from currency"; > tableEnvironment.toAppendStream(tableEnvironment.sqlQuery(querySQL), > Row.class).print(); > tableEnvironment.execute("JdbcExample"); > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)