Terry Wang created FLINK-17313:
----------------------------------

             Summary: Validation error when insert decimal/timestamp/varchar 
with precision into sink using TypeInformation of row
                 Key: FLINK-17313
                 URL: https://issues.apache.org/jira/browse/FLINK-17313
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Terry Wang


Test code like follwing(in blink planner):
{code:java}
                tEnv.sqlUpdate("create table randomSource (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(20,2)" +
                                                "       ) with (" +
                                                "               'type' = 
'random'," +
                                                "               'count' = '10'" 
+
                                                "       )");
                tEnv.sqlUpdate("create table printSink (" +
                                                "               a varchar(10)," 
+
                                                "               b 
decimal(22,2)," +
                                                "               c 
timestamp(3)," +
                                                "       ) with (" +
                                                "       'type' = 'print'" +
                                                "       )");
                tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
                tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation<Row> getRecordType() {
                return getTableSchema().toRowType();
        }
{code}


varchar type exception is:


||Heading 1||
|org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
field 'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

        at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
        at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
        at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
        at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
        at scala.Option.map(Option.scala:146)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
|
other type validation is similar, I dig it and found it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
don't consider the different affect of source and sink . I will open a PR soon 
to solve this problem.








--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to