Hi jingjing,

Please leave comment under the JIRA issue[1] to keep the discussion in one
place.

Thanks,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15283

On Mon, 16 Dec 2019 at 23:00, jingjing bai <baijingjing7...@gmail.com>
wrote:

>    -
>
>    I think you might have misunderstood the sink schema.  It is a order
>    field but not json (or mapping) field in table sink ddl. At least that's
>    the case in Sql.
>    -
>
>    for exemple.  DDL sink schema as    <A String ,B String ,C String >  and
>    the sql is insert into table select  A, C ,B from t2。
>    -
>
>    A ,B and C witch select in the sql will be validation is that field name
>    define in DDL sink . it only means this。
>    -
>
>    It will be mapping by index to sink , here , the result is  A,C,B but
>    not your A.B,C.
>
> Is that a bug?  I think so ,for a common sql。
>
>
>    -
>
>    I think this may be the result of flink's memory optimization
>    selection.   Is this correct
>
>
> roland wang (Jira) <j...@apache.org> 于2019年12月16日周一 下午5:59写道:
>
> > roland wang created FLINK-15283:
> > -----------------------------------
> >
> >              Summary: Scala version of TableSinkUtils has a problem when
> > validating sinks.
> >                  Key: FLINK-15283
> >                  URL: https://issues.apache.org/jira/browse/FLINK-15283
> >              Project: Flink
> >           Issue Type: Bug
> >           Components: API / Scala
> >     Affects Versions: 1.9.0
> >          Environment: All environments of flink 1.9.0
> >             Reporter: roland wang
> >
> >
> > *1. Phenomenon*
> >
> > I created a kafka sink with the schema like :
> > {code:java}
> > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> > {code}
> > When I tried to insert some data into this sink, an error occurs as
> > follows:
> > {code:java}
> > Caused by: org.apache.flink.table.api.ValidationException: Field types of
> > query result and registered TableSink [TEST_SINK] do not match. Query
> > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double]
> > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> > {code}
> >  ** Now I have to keep the order of the query schema absolutely as the
> > sink's schema, which causes a lot of trouble.
> >
> > *2. Cause*
> >
> > I checked the code and found this line :
> > {code:java}
> > // validate schema of source table and table sink
> > val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
> >
> > if (srcFieldTypes.length != sinkFieldTypes.length ||
> >   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
> >     !PlannerTypeUtils.isInteroperable(
> >       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
> >   }) {
> > ...{code}
> > I sink when they try to compare the sink's schma to query's schema, the
> > zip code goes wrong because they forget to sort both of the schema.
> >
> > I trully hope this bug could be fixed soon.
> >
> > Thanks for all your hard work.
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.3.4#803005)
> >
>

Reply via email to