I've just opened a ticket on JIRA:
https://issues.apache.org/jira/browse/FLINK-18608

On Wed, Jul 15, 2020 at 10:10 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi,
>
> Unfortunately this is a bug.
>
> The problem is in CustomizedConvertRule#convertCast as it drops the
> requested nullability. It was fixed in master as part of FLINK-13784[1].
> Therefore the example works on master.
>
> Could you create a jira issue for 1.11 version? We could backport the
> corresponding part of FLINK-13784. As a workaround you can try using the
> values without registering it in the catalog, as the registration triggers
> the type check. (I know this is not perfect):
>
>     final Table inputTable = tableEnv.fromValues(//
>         DataTypes.ROW(//
>             DataTypes.FIELD("col1", DataTypes.STRING()), //
>             DataTypes.FIELD("col2", DataTypes.STRING())//
>         ), ...);
>     tableEnv.executeSql(//
>         "CREATE TABLE `out` (\n" + //
>             "col1 STRING,\n" + //
>             "col2 STRING\n" + //
>             ") WITH (\n" + //
>             " 'connector' = 'filesystem',\n" + //
>             // " 'format' = 'parquet',\n" + //
>             " 'update-mode' = 'append',\n" + //
>             " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>             " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>             ")");
>
>     inputTable.executeInsert(`out`);
>
> As for the types SQL does not have LONG nor STRING types. Java's long is
> equivalent to SQL's BIGINT. STRING is only an alias for
> VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you
> do not need to type the max long manually. For complete list of supported
> types see the docs[2]
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-13784
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html
>
> Best,
>
> Dawid
> On 15/07/2020 09:40, Flavio Pompermaier wrote:
>
> If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I
> change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1
> FROM ParquetDataset".
> If there is still a bug fill a proper JIRA ticket with the exact
> description of the problem..
>
> Just to conclude this thread there are 2 strange things I found:
>
> 1) Is LONG really not supported yet? If I use as output table LONG,STRING
> I get
>       Exception in thread "main" java.lang.UnsupportedOperationException:
> class org.apache.calcite.sql.SqlIdentifier: LONG
>       at org.apache.calcite.util.Util.needToImplement(Util.java:967)
>
> 2) The new planner translates STRING to VARCHAR(2147483647). Is it correct?
>
> Best,
> Flavio
>
>
> On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imj...@gmail.com> wrote:
>
>> I think this might be a bug in `tableEnv.fromValues`.
>>
>> Could you try to remove the DataType parameter, and let the framework
>> derive the types?
>>
>> final Table inputTable = tableEnv.fromValues(
>>         Row.of(1L, "Hello"), //
>>         Row.of(2L, "Hello"), //
>>         Row.of(3L, ""), //
>>         Row.of(4L, "Ciao"));
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com> wrote:
>>
>>> Hi, Flavio
>>>
>>> I reproduced your issue, and I think it should be a bug. But I’m not
>>> sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner
>>> module shaded calcite.
>>>
>>> Maybe Danny can help explain more.
>>>
>>> CC: Danny
>>>
>>> Best
>>> Leonard Xu
>>>
>>> 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it> 写道:
>>>
>>> If I use
>>>
>>> final Table inputTable = tableEnv.fromValues(
>>>         DataTypes.ROW(
>>>             DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
>>>             DataTypes.FIELD("col2", DataTypes.STRING().notNull())
>>>         ), ..
>>>   tableEnv.executeSql(//
>>>         "CREATE TABLE `out` (" +
>>>             "col1 STRING," +
>>>             "col2 STRING" +
>>>             ") WITH (...)
>>>
>>> the job works as expected but this is wrong IMHO
>>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default.
>>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE
>>> should be "STRING NOT NULL" . Am I correct?
>>>
>>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Sorry, obviously  " 'format' = 'parquet'" + is without comment :D
>>>>
>>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I'm trying to test write to parquet using the following code but I
>>>>> have an error:
>>>>>
>>>>>  final TableEnvironment tableEnv =
>>>>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>>>>     final Table inputTable = tableEnv.fromValues(//
>>>>>         DataTypes.ROW(//
>>>>>             DataTypes.FIELD("col1", DataTypes.STRING()), //
>>>>>             DataTypes.FIELD("col2", DataTypes.STRING())//
>>>>>         ), //
>>>>>         Row.of(1L, "Hello"), //
>>>>>         Row.of(2L, "Hello"), //
>>>>>         Row.of(3L, ""), //
>>>>>         Row.of(4L, "Ciao"));
>>>>>     tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>>>>     tableEnv.executeSql(//
>>>>>         "CREATE TABLE `out` (\n" + //
>>>>>             "col1 STRING,\n" + //
>>>>>             "col2 STRING\n" + //
>>>>>             ") WITH (\n" + //
>>>>>             " 'connector' = 'filesystem',\n" + //
>>>>>             // " 'format' = 'parquet',\n" + //
>>>>>             " 'update-mode' = 'append',\n" + //
>>>>>             " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>>>>             " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>>>>             ")");
>>>>>
>>>>>     tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>> ParquetDataset");
>>>>>
>>>>> ---------------------------------
>>>>>
>>>>> Exception in thread "main" java.lang.AssertionError: Conversion to
>>>>> relational algebra failed to preserve datatypes:
>>>>> validated type:
>>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>>>>> converted type:
>>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>>>>> rel:
>>>>> LogicalProject(col1=[$0], col2=[$1])
>>>>>   LogicalUnion(all=[true])
>>>>>     LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>>     LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>>     LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>>     LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER
>>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>>>>> "UTF-16LE"])
>>>>>       LogicalValues(tuples=[[{ 0 }]])
>>>>>
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>>>>> at
>>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>>>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>
>>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>>>>> at
>>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>
>>>>>
>>>>> What is wrong with my code?
>>>>>
>>>>
>>>>

Reply via email to