[ https://issues.apache.org/jira/browse/FLINK-16108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254574#comment-17254574 ]
wangtong commented on FLINK-16108: ---------------------------------- hi [~jark] [DynamicSinkUtils.validateSchemaAndApplyImplicitCast|[https://github.com/apache/flink/blob/e710bec72669b5786671b25501d2350fcc997364/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/DynamicSinkUtils.java]] throws createSchemaMismatchException() by different fields order of query schema from sink schema should the method validateSchemaAndApplyImplicitCast be rewrite to validate the datatype which the sink schema field and query schema field have the same field name {code:java} // code placeholder sinkFields.forEach(sinkField -> { RowField queryField = queryFields.stream() .filter(q -> q.getName().equals(sinkField.getName())) .findFirst() .orElseThrow(() -> createSchemaMismatchException( String.format( "Not found query column for sink column '%s' at position", sinkField.getName() ), sinkIdentifier, queryFields, sinkFields)); final LogicalType queryColumnType = queryField.getType(); final LogicalType sinkColumnType = sinkField.getType(); if (!supportsImplicitCast(queryColumnType, sinkColumnType)) { throw createSchemaMismatchException( String.format( "Incompatible types for sink column '%s' at position", sinkField.getName() ), sinkIdentifier, queryFields, sinkFields); } if (!supportsAvoidingCast(queryColumnType, sinkColumnType)) { requiresCasting.set(true); } }); {code} > StreamSQLExample is failed if running in blink planner > ------------------------------------------------------ > > Key: FLINK-16108 > URL: https://issues.apache.org/jira/browse/FLINK-16108 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Jark Wu > Assignee: Jark Wu > Priority: Critical > Fix For: 1.10.1, 1.11.0 > > > {{StreamSQLExample}} in flink-example will fail if the specified planner is > blink planner. Exception is as following: > {code} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink do not match. > Query schema: [user: BIGINT, product: STRING, amount: INT] > Sink schema: [amount: INT, product: STRING, user: BIGINT] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:229) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:361) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:269) > at > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:260) > at > org.apache.flink.table.examples.java.StreamSQLExample.main(StreamSQLExample.java:90) > Process finished with exit code 1 > {code} > That's because blink planner will also validate the sink schema even if it is > come from {{toAppendStream()}}. However, the > {{TableSinkUtils#inferSinkPhysicalDataType}} should derive sink schema from > query schema when the requested type is POJO [1], because fields order of > POJO is not deterministic. > [1]: > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala#L237 -- This message was sent by Atlassian Jira (v8.3.4#803005)