- I think you might have misunderstood the sink schema. It is a order field but not json (or mapping) field in table sink ddl. At least that's the case in Sql. -
for exemple. DDL sink schema as <A String ,B String ,C String > and the sql is insert into table select A, C ,B from t2。 - A ,B and C witch select in the sql will be validation is that field name define in DDL sink . it only means this。 - It will be mapping by index to sink , here , the result is A,C,B but not your A.B,C. Is that a bug? I think so ,for a common sql。 - I think this may be the result of flink's memory optimization selection. Is this correct roland wang (Jira) <j...@apache.org> 于2019年12月16日周一 下午5:59写道: > roland wang created FLINK-15283: > ----------------------------------- > > Summary: Scala version of TableSinkUtils has a problem when > validating sinks. > Key: FLINK-15283 > URL: https://issues.apache.org/jira/browse/FLINK-15283 > Project: Flink > Issue Type: Bug > Components: API / Scala > Affects Versions: 1.9.0 > Environment: All environments of flink 1.9.0 > Reporter: roland wang > > > *1. Phenomenon* > > I created a kafka sink with the schema like : > {code:java} > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > {code} > When I tried to insert some data into this sink, an error occurs as > follows: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink [TEST_SINK] do not match. Query > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > {code} > ** Now I have to keep the order of the query schema absolutely as the > sink's schema, which causes a lot of trouble. > > *2. Cause* > > I checked the code and found this line : > {code:java} > // validate schema of source table and table sink > val srcFieldTypes = query.getTableSchema.getFieldDataTypes > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes > > if (srcFieldTypes.length != sinkFieldTypes.length || > srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => > !PlannerTypeUtils.isInteroperable( > fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) > }) { > ...{code} > I sink when they try to compare the sink's schma to query's schema, the > zip code goes wrong because they forget to sort both of the schema. > > I trully hope this bug could be fixed soon. > > Thanks for all your hard work. > > > > -- > This message was sent by Atlassian Jira > (v8.3.4#803005) >