Hi jingjing, Please leave comment under the JIRA issue[1] to keep the discussion in one place.
Thanks, Jark [1]: https://issues.apache.org/jira/browse/FLINK-15283 On Mon, 16 Dec 2019 at 23:00, jingjing bai <baijingjing7...@gmail.com> wrote: > - > > I think you might have misunderstood the sink schema. It is a order > field but not json (or mapping) field in table sink ddl. At least that's > the case in Sql. > - > > for exemple. DDL sink schema as <A String ,B String ,C String > and > the sql is insert into table select A, C ,B from t2。 > - > > A ,B and C witch select in the sql will be validation is that field name > define in DDL sink . it only means this。 > - > > It will be mapping by index to sink , here , the result is A,C,B but > not your A.B,C. > > Is that a bug? I think so ,for a common sql。 > > > - > > I think this may be the result of flink's memory optimization > selection. Is this correct > > > roland wang (Jira) <j...@apache.org> 于2019年12月16日周一 下午5:59写道: > > > roland wang created FLINK-15283: > > ----------------------------------- > > > > Summary: Scala version of TableSinkUtils has a problem when > > validating sinks. > > Key: FLINK-15283 > > URL: https://issues.apache.org/jira/browse/FLINK-15283 > > Project: Flink > > Issue Type: Bug > > Components: API / Scala > > Affects Versions: 1.9.0 > > Environment: All environments of flink 1.9.0 > > Reporter: roland wang > > > > > > *1. Phenomenon* > > > > I created a kafka sink with the schema like : > > {code:java} > > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > > {code} > > When I tried to insert some data into this sink, an error occurs as > > follows: > > {code:java} > > Caused by: org.apache.flink.table.api.ValidationException: Field types of > > query result and registered TableSink [TEST_SINK] do not match. Query > > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] > > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > > {code} > > ** Now I have to keep the order of the query schema absolutely as the > > sink's schema, which causes a lot of trouble. > > > > *2. Cause* > > > > I checked the code and found this line : > > {code:java} > > // validate schema of source table and table sink > > val srcFieldTypes = query.getTableSchema.getFieldDataTypes > > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes > > > > if (srcFieldTypes.length != sinkFieldTypes.length || > > srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => > > !PlannerTypeUtils.isInteroperable( > > fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) > > }) { > > ...{code} > > I sink when they try to compare the sink's schma to query's schema, the > > zip code goes wrong because they forget to sort both of the schema. > > > > I trully hope this bug could be fixed soon. > > > > Thanks for all your hard work. > > > > > > > > -- > > This message was sent by Atlassian Jira > > (v8.3.4#803005) > > >