Hi, Unfortunately this is a bug.
The problem is in CustomizedConvertRule#convertCast as it drops the requested nullability. It was fixed in master as part of FLINK-13784[1]. Therefore the example works on master. Could you create a jira issue for 1.11 version? We could backport the corresponding part of FLINK-13784. As a workaround you can try using the values without registering it in the catalog, as the registration triggers the type check. (I know this is not perfect): final Table inputTable = tableEnv.fromValues(// DataTypes.ROW(// DataTypes.FIELD("col1", DataTypes.STRING()), // DataTypes.FIELD("col2", DataTypes.STRING())// ), ...); tableEnv.executeSql(// "CREATE TABLE `out` (\n" + // "col1 STRING,\n" + // "col2 STRING\n" + // ") WITH (\n" + // " 'connector' = 'filesystem',\n" + // // " 'format' = 'parquet',\n" + // " 'update-mode' = 'append',\n" + // " 'path' = 'file://" + TEST_FOLDER + "',\n" + // " 'sink.shuffle-by-partition.enable' = 'true'\n" + // ")"); inputTable.executeInsert(`out`); As for the types SQL does not have LONG nor STRING types. Java's long is equivalent to SQL's BIGINT. STRING is only an alias for VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you do not need to type the max long manually. For complete list of supported types see the docs[2] [1] https://issues.apache.org/jira/browse/FLINK-13784 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html Best, Dawid On 15/07/2020 09:40, Flavio Pompermaier wrote: > If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if > I change also the query to "INSERT INTO `out` SELECT CAST(f0 AS > STRING) ,f1 FROM ParquetDataset". > If there is still a bug fill a proper JIRA ticket with the exact > description of the problem.. > > Just to conclude this thread there are 2 strange things I found: > > 1) Is LONG really not supported yet? If I use as output table > LONG,STRING I get > Exception in thread "main" > java.lang.UnsupportedOperationException: class > org.apache.calcite.sql.SqlIdentifier: LONG > at org.apache.calcite.util.Util.needToImplement(Util.java:967) > > 2) The new planner translates STRING to VARCHAR(2147483647). Is it > correct? > > Best, > Flavio > > > On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imj...@gmail.com > <mailto:imj...@gmail.com>> wrote: > > I think this might be a bug in `tableEnv.fromValues`. > > Could you try to remove the DataType parameter, and let the > framework derive the types? > > final Table inputTable = tableEnv.fromValues( > Row.of(1L, "Hello"), // > Row.of(2L, "Hello"), // > Row.of(3L, ""), // > Row.of(4L, "Ciao")); > > Best, > Jark > > > On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com > <mailto:xbjt...@gmail.com>> wrote: > > Hi, Flavio > > I reproduced your issue, and I think it should be a bug. But > I’m not sure it comes from Calcite or Flink shaded Calcite, > Flink Table Planner module shaded calcite. > > Maybe Danny can help explain more. > > CC: Danny > > Best > Leonard Xu > >> 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it >> <mailto:pomperma...@okkam.it>> 写道: >> >> If I use >> >> final Table inputTable = tableEnv.fromValues( >> DataTypes.ROW( >> DataTypes.FIELD("col1", >> DataTypes.STRING().notNull()), >> DataTypes.FIELD("col2", DataTypes.STRING().notNull()) >> ), .. >> tableEnv.executeSql(// >> "CREATE TABLE `out` (" + >> "col1 STRING," + >> "col2 STRING" + >> ") WITH (...) >> >> the job works as expected but this is wrong IMHO >> because DataTypes.STRING() = DataTypes.STRING().nullable() by >> default. >> If I have DataTypes.STRING().notNull() the type in the CREATE >> TABLE should be "STRING NOT NULL" . Am I correct? >> >> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier >> <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote: >> >> Sorry, obviously " 'format' = 'parquet'" + is >> without comment :D >> >> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier >> <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote: >> >> Hi to all, >> I'm trying to test write to parquet using the >> following code but I have an error: >> >> final TableEnvironment tableEnv = >> DatalinksExecutionEnvironment.getBatchTableEnv(); >> final Table inputTable = tableEnv.fromValues(// >> DataTypes.ROW(// >> DataTypes.FIELD("col1", >> DataTypes.STRING()), // >> DataTypes.FIELD("col2", DataTypes.STRING())// >> ), // >> Row.of(1L, "Hello"), // >> Row.of(2L, "Hello"), // >> Row.of(3L, ""), // >> Row.of(4L, "Ciao")); >> tableEnv.createTemporaryView("ParquetDataset", >> inputTable); >> tableEnv.executeSql(// >> "CREATE TABLE `out` (\n" + // >> "col1 STRING,\n" + // >> "col2 STRING\n" + // >> ") WITH (\n" + // >> " 'connector' = 'filesystem',\n" + // >> // " 'format' = 'parquet',\n" + // >> " 'update-mode' = 'append',\n" + // >> " 'path' = 'file://" + TEST_FOLDER + >> "',\n" + // >> " 'sink.shuffle-by-partition.enable' = >> 'true'\n" + // >> ")"); >> >> tableEnv.executeSql("INSERT INTO `out` SELECT * >> FROM ParquetDataset"); >> >> --------------------------------- >> >> Exception in thread "main" java.lang.AssertionError: >> Conversion to relational algebra failed to preserve >> datatypes: >> validated type: >> RecordType(VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE" col1, VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE" col2) NOT NULL >> converted type: >> RecordType(VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE" NOT NULL col1, VARCHAR(2147483647) >> CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL >> rel: >> LogicalProject(col1=[$0], col2=[$1]) >> LogicalUnion(all=[true]) >> >> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) >> CHARACTER SET "UTF-16LE"], >> col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER >> SET "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> >> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) >> CHARACTER SET "UTF-16LE"], >> col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER >> SET "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> >> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) >> CHARACTER SET "UTF-16LE"], >> col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET >> "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> >> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) >> CHARACTER SET "UTF-16LE"], >> col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER >> SET "UTF-16LE"]) >> LogicalValues(tuples=[[{ 0 }]]) >> >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465) >> at >> >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> >> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) >> at >> >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) >> at >> >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) >> at >> >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >> >> >> What is wrong with my code? >> >> >
signature.asc
Description: OpenPGP digital signature