here is the ticket:

https://issues.apache.org/jira/browse/FLINK-25014

> On 22.11.2021, at 15:00, Francesco Guardiani <france...@ververica.com> wrote:
> 
> From a first look, this seems to be a bug. If not, it's certainly a feature 
> worth supporting.
> 
> Mind opening an issue with a reproducer?
> 
> 
> On Thu, Nov 18, 2021 at 1:37 PM Oliver Moser <olz...@gmail.com 
> <mailto:olz...@gmail.com>> wrote:
> Hi!
> 
> I'm running into a problem when converting back and forth from a streaming 
> table to a data stream. Given the following
> table DDL:
> 
>    create table masterdata
>    (
>       facility text,
>       manufacturer text,
>       serial integer,
>       latitude double precision,
>       longitude double precision,
>       elevation double precision
>    );
> 
> and a corresponding POJO
> 
>    public class MasterData {
> 
>       public Double elevation;
> 
>       public String facility;
> 
>       public Double latitude;
> 
>       public Double longitude;
> 
>       public String manufacturer;
> 
>       public Long serial;
> 
>       // getter/setter omitted
> 
>    }
> 
> I register the database using JdbcCatalog like this:
> 
>    JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
> password, baseUrl);
>    tableEnv.registerCatalog("cat", catalog);
>    tableEnv.useCatalog("cat");
> 
> and if I try to create a table with either "SELECT * FROM masterdata" or via
> 
>    tableEnv.from("masterdata");
> 
> It will bail out with an exception similar to
> 
>    Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for
>    registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not 
> match.
> 
>    Cause: Incompatible types for sink column 'elevation' at position 1.
> 
>    Query schema: [facility: STRING, manufacturer: STRING, serial: INT, 
> latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
>    Sink schema:  [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, 
> longitude: DOUBLE, manufacturer: STRING, serial: BIGINT]
> 
> If i explicitly set the order of the columns in the SELECT like this:
> 
>    tableEnv.sqlQuery("SELECT 
> elevation,facility,latitude,longitude,manufacturer,serial from masterdata");
> 
> it works. In the debugger I can see that "queryFields" and "sinkField" in the 
> call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
> () are not aligned, i.e. the order of the fields in those two lists are not 
> the same, hence the exception.
> 
> According to relevant note in the docs [1]:
> 
>    the planner reorders fields and inserts implicit casts where possible to 
> convert internal
>    data structures to the desired structured type
> 
> Which makes me think that as long as the names of the fields in the POJO 
> correspond to the column names in the table,
> the planner should take care of reordering and the explicit "SELECT 
> elevation, ..." should not be needed.
> 
> What am I missing?
> 
> Thanks!
> 
> Oliver
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream
>  
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream>
> 

Reply via email to