here is the ticket: https://issues.apache.org/jira/browse/FLINK-25014
> On 22.11.2021, at 15:00, Francesco Guardiani <france...@ververica.com> wrote: > > From a first look, this seems to be a bug. If not, it's certainly a feature > worth supporting. > > Mind opening an issue with a reproducer? > > > On Thu, Nov 18, 2021 at 1:37 PM Oliver Moser <olz...@gmail.com > <mailto:olz...@gmail.com>> wrote: > Hi! > > I'm running into a problem when converting back and forth from a streaming > table to a data stream. Given the following > table DDL: > > create table masterdata > ( > facility text, > manufacturer text, > serial integer, > latitude double precision, > longitude double precision, > elevation double precision > ); > > and a corresponding POJO > > public class MasterData { > > public Double elevation; > > public String facility; > > public Double latitude; > > public Double longitude; > > public String manufacturer; > > public Long serial; > > // getter/setter omitted > > } > > I register the database using JdbcCatalog like this: > > JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, > password, baseUrl); > tableEnv.registerCatalog("cat", catalog); > tableEnv.useCatalog("cat"); > > and if I try to create a table with either "SELECT * FROM masterdata" or via > > tableEnv.from("masterdata"); > > It will bail out with an exception similar to > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Column types of query result and sink for > registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not > match. > > Cause: Incompatible types for sink column 'elevation' at position 1. > > Query schema: [facility: STRING, manufacturer: STRING, serial: INT, > latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE] > Sink schema: [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, > longitude: DOUBLE, manufacturer: STRING, serial: BIGINT] > > If i explicitly set the order of the columns in the SELECT like this: > > tableEnv.sqlQuery("SELECT > elevation,facility,latitude,longitude,manufacturer,serial from masterdata"); > > it works. In the debugger I can see that "queryFields" and "sinkField" in the > call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast > () are not aligned, i.e. the order of the fields in those two lists are not > the same, hence the exception. > > According to relevant note in the docs [1]: > > the planner reorders fields and inserts implicit casts where possible to > convert internal > data structures to the desired structured type > > Which makes me think that as long as the names of the fields in the POJO > correspond to the column names in the table, > the planner should take care of reordering and the explicit "SELECT > elevation, ..." should not be needed. > > What am I missing? > > Thanks! > > Oliver > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream > > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream> >