[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089382#comment-17089382 ]
Wenlong Lyu commented on FLINK-17313: ------------------------------------- [~dwysakowicz] I think it is not necessary that the schema of logical schema and physical schema should be matched exactly: Currently, we allow a column in source : logical type varchar(10), while pyshical type is varchar(5), see `CastAvoidanceChecker` used in the compatible check. The requirement on source is that: we need to be able convert a physical record of source to internal record according to physicalDataType and LogicalDataType. On sinks, the requirements should be reversed: we need to be able convert an internal record to a physical record for sink: so we can allow a column of sink whose Logical type is varchar(5) but physical type is varchar(10). On validation: we has an validation to make sure that the schema of source query of a sink match the logical type, the validation between logical type and physical can be much more loose I think. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > ------------------------------------------------------------------------------------------------------------ > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Terry Wang > Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation<Row> getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) > Other type validation exception is similar, I dig into and think it's caused > by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the > method doesn't consider the different physical and logical type validation > logic of source and sink: logical type should be able to cover the physical > type in source, but physical type should be able to cover the logic type in > sink vice verse. Besides, the decimal type should be taken more carefully, > when target type is Legacy(Decimal), it should be able to accept any > precision decimal type. -- This message was sent by Atlassian Jira (v8.3.4#803005)