>From a first look, this seems to be a bug. If not, it's certainly a feature
worth supporting.

Mind opening an issue with a reproducer?


On Thu, Nov 18, 2021 at 1:37 PM Oliver Moser <olz...@gmail.com> wrote:

> Hi!
>
> I'm running into a problem when converting back and forth from a streaming 
> table to a data stream. Given the following
> table DDL:
>
>    create table masterdata
>    (
>       facility text,
>       manufacturer text,
>       serial integer,
>       latitude double precision,
>       longitude double precision,
>       elevation double precision
>    );
>
> and a corresponding POJO
>
>    public class MasterData {
>
>       public Double elevation;
>
>       public String facility;
>
>       public Double latitude;
>
>       public Double longitude;
>
>       public String manufacturer;
>
>       public Long serial;
>
>       // getter/setter omitted
>
>    }
>
> I register the database using JdbcCatalog like this:
>
>    JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
> password, baseUrl);
>    tableEnv.registerCatalog("cat", catalog);
>    tableEnv.useCatalog("cat");
>
> and if I try to create a table with either "SELECT * FROM masterdata" or via
>
>    tableEnv.from("masterdata");
>
> It will bail out with an exception similar to
>
>    Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for
>    registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not 
> match.
>
>    Cause: Incompatible types for sink column 'elevation' at position 1.
>
>    Query schema: [facility: STRING, manufacturer: STRING, serial: INT, 
> latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
>    Sink schema:  [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, 
> longitude: DOUBLE, manufacturer: STRING, serial: BIGINT]
>
> If i explicitly set the order of the columns in the SELECT like this:
>
>    tableEnv.sqlQuery("SELECT 
> elevation,facility,latitude,longitude,manufacturer,serial from masterdata");
>
> it works. In the debugger I can see that "queryFields" and "sinkField" in the 
> call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
> () are not aligned, i.e. the order of the fields in those two lists are not 
> the same, hence the exception.
>
> According to relevant note in the docs [1]:
>
>    the planner reorders fields and inserts implicit casts where possible to 
> convert internal
>    data structures to the desired structured type
>
> Which makes me think that as long as the names of the fields in the POJO 
> correspond to the column names in the table,
> the planner should take care of reordering and the explicit "SELECT 
> elevation, ..." should not be needed.
>
> What am I missing?
>
> Thanks!
>
> Oliver
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream
>
>
>

Reply via email to