[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz updated FLINK-17313: ------------------------------------- Fix Version/s: 1.11.0 > Validation error when insert decimal/varchar with precision into sink using > TypeInformation of row > -------------------------------------------------------------------------------------------------- > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Terry Wang > Assignee: Terry Wang > Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select * from > randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation<Row> getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) > Other type validation exception is similar, I dig into and think it's caused > by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the > method doesn't consider the different physical and logical type validation > logic of source and sink: logical type should be able to cover the physical > type in source, but physical type should be able to cover the logic type in > sink vice verse. Besides, the decimal type should be taken more carefully, > when target type is Legacy(Decimal), it should be able to accept any > precision decimal type. -- This message was sent by Atlassian Jira (v8.3.4#803005)