[ https://issues.apache.org/jira/browse/FLINK-18364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xiaojin.wy updated FLINK-18364: ------------------------------- Description: *The whole error is:* Caused by: org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'rowtime' does not match with the physical type TIMESTAMP(3) of the 'rowtime' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$5(TypeMappingUtils.java:178) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:300) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:267) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) at org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:152) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:267) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:174) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:368) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:361) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:209) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1249) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1241) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:317) at com.ververica.flink.table.gateway.context.ExecutionContext.createPipeline(ExecutionContext.java:223) at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$null$0(SelectOperation.java:225) at com.ververica.flink.table.gateway.deployment.DeploymentUtil.wrapHadoopUserNameIfNeeded(DeploymentUtil.java:48) at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$executeQueryInternal$1(SelectOperation.java:220) at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoaderWithException(ExecutionContext.java:197) at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:219) ... 48 more I run the sql by sql-gateway. When I run it in a batch environment, the sql run well and can produce the result of "2015-02-15T10:00:00|4\n2015-02-15T11:00:00|1". But change to the streaming environment, the errors come. *The sql is:* select floor(rowtime to hour) as rowtime, count(*) as c from orders group by floor(rowtime to hour) *The table query is:* CREATE TABLE `orders` ( rowtime TIMESTAMP, id INT, product VARCHAR, units INT ) WITH ( 'format.field-delimiter'='|', 'connector.type'='filesystem', 'format.derive-schema'='true', 'connector.path'='/daily_regression_stream_blink_sql_1.10/test_agg/sources/orders.csv', 'format.type'='csv' ) or query is: CREATE TABLE `orders` ( rowtime TIMESTAMP, id INT, product VARCHAR, units INT ) WITH ( 'connector'='filesystem', 'csv.field-delimiter'='|', 'path'='/defender_test_data/daily_regression_stream_blink_sql_1.10/test_agg/sources/orders.csv', 'csv.null-literal'='', 'format'='csv' ) was: *The whole error is:* Caused by: org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'rowtime' does not match with the physical type TIMESTAMP(3) of the 'rowtime' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$5(TypeMappingUtils.java:178) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:300) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:267) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) at org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:152) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:267) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:174) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:368) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:361) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:209) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1249) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1241) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:317) at com.ververica.flink.table.gateway.context.ExecutionContext.createPipeline(ExecutionContext.java:223) at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$null$0(SelectOperation.java:225) at com.ververica.flink.table.gateway.deployment.DeploymentUtil.wrapHadoopUserNameIfNeeded(DeploymentUtil.java:48) at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$executeQueryInternal$1(SelectOperation.java:220) at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoaderWithException(ExecutionContext.java:197) at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:219) ... 48 more I run the sql by sql-gateway. When I run it in a batch environment, the sql run well and can produce the result of "2015-02-15T10:00:00|4\n2015-02-15T11:00:00|1". But change to the streaming environment, the errors come. *The sql is:* select floor(rowtime to hour) as rowtime, count(*) as c from orders group by floor(rowtime to hour) *The table query is:* CREATE TABLE `orders` ( rowtime TIMESTAMP, id INT, product VARCHAR, units INT ) WITH ( 'format.field-delimiter'='|', 'connector.type'='filesystem', 'format.derive-schema'='true', 'connector.path'='/daily_regression_stream_blink_sql_1.10/test_agg/sources/orders.csv', 'format.type'='csv' ) or query is: > A streaming sql cause "org.apache.flink.table.api.ValidationException: Type > TIMESTAMP(6) of table field 'rowtime' does not match with the physical type > TIMESTAMP(3) of the 'rowtime' field of the TableSink consumed type" > --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-18364 > URL: https://issues.apache.org/jira/browse/FLINK-18364 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.11.0 > Environment: The input data is: > 2015-02-15 10:15:00.0|1|paint|10 > 2015-02-15 10:24:15.0|2|paper|5 > 2015-02-15 10:24:45.0|3|brush|12 > 2015-02-15 10:58:00.0|4|paint|3 > 2015-02-15 11:10:00.0|5|paint|3 > Reporter: xiaojin.wy > Priority: Major > Fix For: 1.11.0 > > > *The whole error is:* > Caused by: org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) > of table field 'rowtime' does not match with the physical type TIMESTAMP(3) > of the 'rowtime' field of the TableSink consumed type. at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$5(TypeMappingUtils.java:178) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:300) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:267) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132) > at > org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:152) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:267) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:174) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:368) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:361) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:209) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > at scala.Option.map(Option.scala:146) at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1249) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1241) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:317) > at > com.ververica.flink.table.gateway.context.ExecutionContext.createPipeline(ExecutionContext.java:223) > at > com.ververica.flink.table.gateway.operation.SelectOperation.lambda$null$0(SelectOperation.java:225) > at > com.ververica.flink.table.gateway.deployment.DeploymentUtil.wrapHadoopUserNameIfNeeded(DeploymentUtil.java:48) > at > com.ververica.flink.table.gateway.operation.SelectOperation.lambda$executeQueryInternal$1(SelectOperation.java:220) > at > com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoaderWithException(ExecutionContext.java:197) > at > com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:219) > ... 48 more > I run the sql by sql-gateway. > When I run it in a batch environment, the sql run well and can produce the > result of "2015-02-15T10:00:00|4\n2015-02-15T11:00:00|1". But change to the > streaming environment, the errors come. > *The sql is:* > select floor(rowtime to hour) as rowtime, count(*) as c from orders group by > floor(rowtime to hour) > *The table query is:* > CREATE TABLE `orders` ( > rowtime TIMESTAMP, > id INT, > product VARCHAR, > units INT > ) WITH ( > 'format.field-delimiter'='|', > 'connector.type'='filesystem', > 'format.derive-schema'='true', > > 'connector.path'='/daily_regression_stream_blink_sql_1.10/test_agg/sources/orders.csv', > 'format.type'='csv' > ) > or query is: > > CREATE TABLE `orders` ( > rowtime TIMESTAMP, > id INT, > product VARCHAR, > units INT > ) WITH ( > 'connector'='filesystem', > 'csv.field-delimiter'='|', > 'path'='/defender_test_data/daily_regression_stream_blink_sql_1.10/test_agg/sources/orders.csv', > 'csv.null-literal'='', > 'format'='csv' > ) > -- This message was sent by Atlassian Jira (v8.3.4#803005)