If I use

final Table inputTable = tableEnv.fromValues(
        DataTypes.ROW(
            DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
            DataTypes.FIELD("col2", DataTypes.STRING().notNull())
        ), ..
  tableEnv.executeSql(//
        "CREATE TABLE `out` (" +
            "col1 STRING," +
            "col2 STRING" +
            ") WITH (...)

the job works as expected but this is wrong IMHO because DataTypes.STRING()
= DataTypes.STRING().nullable() by default.
If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should
be "STRING NOT NULL" . Am I correct?

On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Sorry, obviously  " 'format' = 'parquet'" + is without comment :D
>
> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Hi to all,
>> I'm trying to test write to parquet using the following code but I have
>> an error:
>>
>>  final TableEnvironment tableEnv =
>> DatalinksExecutionEnvironment.getBatchTableEnv();
>>     final Table inputTable = tableEnv.fromValues(//
>>         DataTypes.ROW(//
>>             DataTypes.FIELD("col1", DataTypes.STRING()), //
>>             DataTypes.FIELD("col2", DataTypes.STRING())//
>>         ), //
>>         Row.of(1L, "Hello"), //
>>         Row.of(2L, "Hello"), //
>>         Row.of(3L, ""), //
>>         Row.of(4L, "Ciao"));
>>     tableEnv.createTemporaryView("ParquetDataset", inputTable);
>>     tableEnv.executeSql(//
>>         "CREATE TABLE `out` (\n" + //
>>             "col1 STRING,\n" + //
>>             "col2 STRING\n" + //
>>             ") WITH (\n" + //
>>             " 'connector' = 'filesystem',\n" + //
>>             // " 'format' = 'parquet',\n" + //
>>             " 'update-mode' = 'append',\n" + //
>>             " 'path' = 'file://" + TEST_FOLDER + "',\n" + //
>>             " 'sink.shuffle-by-partition.enable' = 'true'\n" + //
>>             ")");
>>
>>     tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset");
>>
>> ---------------------------------
>>
>> Exception in thread "main" java.lang.AssertionError: Conversion to
>> relational algebra failed to preserve datatypes:
>> validated type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1,
>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL
>> converted type:
>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1,
>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL
>> rel:
>> LogicalProject(col1=[$0], col2=[$1])
>>   LogicalUnion(all=[true])
>>     LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>>       LogicalValues(tuples=[[{ 0 }]])
>>     LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>>       LogicalValues(tuples=[[{ 0 }]])
>>     LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>>       LogicalValues(tuples=[[{ 0 }]])
>>     LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET
>> "UTF-16LE"])
>>       LogicalValues(tuples=[[{ 0 }]])
>>
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>>
>> What is wrong with my code?
>>
>
>

Reply via email to