[ https://issues.apache.org/jira/browse/FLINK-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254713#comment-17254713 ]
Jark Wu commented on FLINK-16108: --------------------------------- Hi [~wtog], {{INSERT INTO}} mapping fields of query to fields of sink table by the fields order, not the fields name. This is the standard SQL behavior and also all the databases follow this rule. > StreamSQLExample is failed if running in blink planner > ------------------------------------------------------ > > Key: FLINK-16108 > URL: https://issues.apache.org/jira/browse/FLINK-16108 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Jark Wu > Assignee: Jark Wu > Priority: Critical > Fix For: 1.10.1, 1.11.0 > > > {{StreamSQLExample}} in flink-example will fail if the specified planner is > blink planner. Exception is as following: > {code} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink do not match. > Query schema: [user: BIGINT, product: STRING, amount: INT] > Sink schema: [amount: INT, product: STRING, user: BIGINT] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260) > at > org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90) > Process finished with exit code 1 > {code} > That's because blink planner will also validate the sink schema even if it is > come from {{toAppendStream()}}. However, the > {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from > query schema when the requested type is POJO [1], because fields order of > POJO is not deterministic. > [1]: > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237 -- This message was sent by Atlassian Jira (v8.3.4#803005)