-

   I think you might have misunderstood the sink schema.  It is a order
   field but not json (or mapping) field in table sink ddl. At least that's
   the case in Sql.
   -

   for exemple.  DDL sink schema as    <A String ,B String ,C String >  and
   the sql is insert into table select  A, C ,B from t2。
   -

   A ,B and C witch select in the sql will be validation is that field name
   define in DDL sink . it only means this。
   -

   It will be mapping by index to sink , here , the result is  A,C,B but
   not your A.B,C.

Is that a bug?  I think so ,for a common sql。


   -

   I think this may be the result of flink's memory optimization
   selection.   Is this correct


roland wang (Jira) <j...@apache.org> 于2019年12月16日周一 下午5:59写道:

> roland wang created FLINK-15283:
> -----------------------------------
>
>              Summary: Scala version of TableSinkUtils has a problem when
> validating sinks.
>                  Key: FLINK-15283
>                  URL: https://issues.apache.org/jira/browse/FLINK-15283
>              Project: Flink
>           Issue Type: Bug
>           Components: API / Scala
>     Affects Versions: 1.9.0
>          Environment: All environments of flink 1.9.0
>             Reporter: roland wang
>
>
> *1. Phenomenon*
>
> I created a kafka sink with the schema like :
> {code:java}
> [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
> When I tried to insert some data into this sink, an error occurs as
> follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink [TEST_SINK] do not match. Query
> result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double]
> TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
>  ** Now I have to keep the order of the query schema absolutely as the
> sink's schema, which causes a lot of trouble.
>
> *2. Cause*
>
> I checked the code and found this line :
> {code:java}
> // validate schema of source table and table sink
> val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
>
> if (srcFieldTypes.length != sinkFieldTypes.length ||
>   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
>     !PlannerTypeUtils.isInteroperable(
>       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
>   }) {
> ...{code}
> I sink when they try to compare the sink's schma to query's schema, the
> zip code goes wrong because they forget to sort both of the schema.
>
> I trully hope this bug could be fixed soon.
>
> Thanks for all your hard work.
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>

Reply via email to