Hi,

Unfortunately this is a bug.

The problem is in CustomizedConvertRule#convertCast as it drops the
requested nullability. It was fixed in master as part of FLINK-13784[1].
Therefore the example works on master.

Could you create a jira issue for 1.11 version? We could backport the
corresponding part of FLINK-13784. As a workaround you can try using the
values without registering it in the catalog, as the registration
triggers the type check. (I know this is not perfect):

    final Table inputTable = tableEnv.fromValues(//
        DataTypes.ROW(//
            DataTypes.FIELD("col1", DataTypes.STRING()), //
            DataTypes.FIELD("col2", DataTypes.STRING())//
        ), ...);
    tableEnv.executeSql(//
        "CREATE TABLE `out` (\n" + //
            "col1 STRING,\n" + //
            "col2 STRING\n" + //
            ") WITH (\n" + //
            " 'connector' = 'filesystem',\n" + //
            // " 'format' = 'parquet',\n" + //
            " 'update-mode' = 'append',\n" + //
            " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
            " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
            ")");

    inputTable.executeInsert(`out`);

As for the types SQL does not have LONG nor STRING types. Java's long is
equivalent to SQL's BIGINT. STRING is only an alias for
VARCHAR(Long.MAX_VALUE), which was added for improved usability so that
you do not need to type the max long manually. For complete list of
supported types see the docs[2]


[1] https://issues.apache.org/jira/browse/FLINK-13784

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html

Best,

Dawid

On 15/07/2020 09:40, Flavio Pompermaier wrote:
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if
> I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS
> STRING) ,f1 FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table
> LONG,STRING I get
>       Exception in thread "main"
> java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlIdentifier: LONG
>       at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it
> correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imj...@gmail.com
> <mailto:imj...@gmail.com>> wrote:
>
>     I think this might be a bug in `tableEnv.fromValues`.
>
>     Could you try to remove the DataType parameter, and let the
>     framework derive the types?
>
>     final Table inputTable = tableEnv.fromValues(
>             Row.of(1L, "Hello"), //
>             Row.of(2L, "Hello"), //
>             Row.of(3L, ""), //
>             Row.of(4L, "Ciao"));
>
>     Best,
>     Jark
>
>
>     On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com
>     <mailto:xbjt...@gmail.com>> wrote:
>
>         Hi, Flavio
>
>         I reproduced your issue, and I think it should be a bug. But
>         I’m not sure it comes from Calcite or Flink shaded Calcite,
>         Flink Table Planner module shaded calcite. 
>
>         Maybe Danny can help explain more.
>
>         CC: Danny
>
>         Best
>         Leonard Xu
>
>>         在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it
>>         <mailto:pomperma...@okkam.it>> 写道:
>>
>>         If I use 
>>
>>         final Table inputTable = tableEnv.fromValues(
>>                 DataTypes.ROW(
>>                     DataTypes.FIELD("col1",
>>         DataTypes.STRING().notNull()),
>>                     DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>                 ), ..
>>           tableEnv.executeSql(//
>>                 "CREATE TABLE `out` (" +
>>                     "col1 STRING," + 
>>                     "col2 STRING" + 
>>                     ") WITH (...)
>>
>>         the job works as expected but this is wrong IMHO
>>         because DataTypes.STRING() = DataTypes.STRING().nullable() by
>>         default.
>>         If I have DataTypes.STRING().notNull() the type in the CREATE
>>         TABLE should be "STRING NOT NULL" . Am I correct?
>>
>>         On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier
>>         <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:
>>
>>             Sorry, obviously  " 'format' = 'parquet'" + is
>>             without comment :D
>>
>>             On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier
>>             <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:
>>
>>                 Hi to all,
>>                 I'm trying to test write to parquet using the
>>                 following code but I have an error:
>>
>>                  final TableEnvironment tableEnv =
>>                 DatalinksExecutionEnvironment.getBatchTableEnv();
>>                     final Table inputTable = tableEnv.fromValues(//
>>                         DataTypes.ROW(//
>>                             DataTypes.FIELD("col1",
>>                 DataTypes.STRING()), //
>>                             DataTypes.FIELD("col2", DataTypes.STRING())//
>>                         ), //
>>                         Row.of(1L, "Hello"), //
>>                         Row.of(2L, "Hello"), //
>>                         Row.of(3L, ""), //
>>                         Row.of(4L, "Ciao"));
>>                     tableEnv.createTemporaryView("ParquetDataset",
>>                 inputTable);
>>                     tableEnv.executeSql(//
>>                         "CREATE TABLE `out` (\n" + //
>>                             "col1 STRING,\n" + //
>>                             "col2 STRING\n" + //
>>                             ") WITH (\n" + //
>>                             " 'connector' = 'filesystem',\n" + //
>>                             // " 'format' = 'parquet',\n" + //
>>                             " 'update-mode' = 'append',\n" + //
>>                             " 'path' = 'file://" + TEST_FOLDER +
>>                 "',\n" + //
>>                             " 'sink.shuffle-by-partition.enable' =
>>                 'true'\n" + //
>>                             ")");
>>
>>                     tableEnv.executeSql("INSERT INTO `out` SELECT *
>>                 FROM ParquetDataset");
>>
>>                 ---------------------------------
>>
>>                 Exception in thread "main" java.lang.AssertionError:
>>                 Conversion to relational algebra failed to preserve
>>                 datatypes:
>>                 validated type:
>>                 RecordType(VARCHAR(2147483647) CHARACTER SET
>>                 "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET
>>                 "UTF-16LE" col2) NOT NULL
>>                 converted type:
>>                 RecordType(VARCHAR(2147483647) CHARACTER SET
>>                 "UTF-16LE" NOT NULL col1, VARCHAR(2147483647)
>>                 CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>                 rel:
>>                 LogicalProject(col1=[$0], col2=[$1])
>>                   LogicalUnion(all=[true])
>>                    
>>                 LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647)
>>                 CHARACTER SET "UTF-16LE"],
>>                 col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER
>>                 SET "UTF-16LE"])
>>                       LogicalValues(tuples=[[{ 0 }]])
>>                    
>>                 LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647)
>>                 CHARACTER SET "UTF-16LE"],
>>                 col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER
>>                 SET "UTF-16LE"])
>>                       LogicalValues(tuples=[[{ 0 }]])
>>                    
>>                 LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647)
>>                 CHARACTER SET "UTF-16LE"],
>>                 col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>                 "UTF-16LE"])
>>                       LogicalValues(tuples=[[{ 0 }]])
>>                    
>>                 LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647)
>>                 CHARACTER SET "UTF-16LE"],
>>                 col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER
>>                 SET "UTF-16LE"])
>>                       LogicalValues(tuples=[[{ 0 }]])
>>
>>                 at
>>                 
>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>                 at
>>                 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>                 at
>>                 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>                 
>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>                 at
>>                 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>                 at
>>                 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>                 at
>>                 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>                 at
>>                 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>                 at
>>                 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>                 at
>>                 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>                 at
>>                 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>                 at
>>                 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>>
>>                 What is wrong with my code?
>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to