If I use final Table inputTable = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("col1", DataTypes.STRING().notNull()), DataTypes.FIELD("col2", DataTypes.STRING().notNull()) ), .. tableEnv.executeSql(// "CREATE TABLE `out` (" + "col1 STRING," + "col2 STRING" + ") WITH (...)
the job works as expected but this is wrong IMHO because DataTypes.STRING() = DataTypes.STRING().nullable() by default. If I have DataTypes.STRING().notNull() the type in the CREATE TABLE should be "STRING NOT NULL" . Am I correct? On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > Sorry, obviously " 'format' = 'parquet'" + is without comment :D > > On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Hi to all, >> I'm trying to test write to parquet using the following code but I have >> an error: >> >> final TableEnvironment tableEnv = >> DatalinksExecutionEnvironment.getBatchTableEnv(); >> final Table inputTable = tableEnv.fromValues(// >> DataTypes.ROW(// >> DataTypes.FIELD("col1", DataTypes.STRING()), // >> DataTypes.FIELD("col2", DataTypes.STRING())// >> ), // >> Row.of(1L, "Hello"), // >> Row.of(2L, "Hello"), // >> Row.of(3L, ""), // >> Row.of(4L, "Ciao")); >> tableEnv.createTemporaryView("ParquetDataset", inputTable); >> tableEnv.executeSql(// >> "CREATE TABLE `out` (\n" + // >> "col1 STRING,\n" + // >> "col2 STRING\n" + // >> ") WITH (\n" + // >> " 'connector' = 'filesystem',\n" + // >> // " 'format' = 'parquet',\n" + // >> " 'update-mode' = 'append',\n" + // >> " 'path' = 'file://" + TEST_FOLDER + "',\n" + // >> " 'sink.shuffle-by-partition.enable' = 'true'\n" + // >> ")"); >> >> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM ParquetDataset"); >> >> --------------------------------- >> >> Exception in thread "main" java.lang.AssertionError: Conversion to >> relational algebra failed to preserve datatypes: >> validated type: >> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, >> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL >> converted type: >> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, >> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL >> rel: >> LogicalProject(col1=[$0], col2=[$1]) >> LogicalUnion(all=[true]) >> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >> >> >> What is wrong with my code? >> > >