I've just opened a ticket on JIRA: https://issues.apache.org/jira/browse/FLINK-18608
On Wed, Jul 15, 2020 at 10:10 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi, > > Unfortunately this is a bug. > > The problem is in CustomizedConvertRule#convertCast as it drops the > requested nullability. It was fixed in master as part of FLINK-13784[1]. > Therefore the example works on master. > > Could you create a jira issue for 1.11 version? We could backport the > corresponding part of FLINK-13784. As a workaround you can try using the > values without registering it in the catalog, as the registration triggers > the type check. (I know this is not perfect): > > final Table inputTable = tableEnv.fromValues(// > DataTypes.ROW(// > DataTypes.FIELD("col1", DataTypes.STRING()), // > DataTypes.FIELD("col2", DataTypes.STRING())// > ), ...); > tableEnv.executeSql(// > "CREATE TABLE `out` (\n" + // > "col1 STRING,\n" + // > "col2 STRING\n" + // > ") WITH (\n" + // > " 'connector' = 'filesystem',\n" + // > // " 'format' = 'parquet',\n" + // > " 'update-mode' = 'append',\n" + // > " 'path' = 'file://" + TEST_FOLDER + "',\n" + // > " 'sink.shuffle-by-partition.enable' = 'true'\n" + // > ")"); > > inputTable.executeInsert(`out`); > > As for the types SQL does not have LONG nor STRING types. Java's long is > equivalent to SQL's BIGINT. STRING is only an alias for > VARCHAR(Long.MAX_VALUE), which was added for improved usability so that you > do not need to type the max long manually. For complete list of supported > types see the docs[2] > > > [1] https://issues.apache.org/jira/browse/FLINK-13784 > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html > > Best, > > Dawid > On 15/07/2020 09:40, Flavio Pompermaier wrote: > > If I use tableEnv.fromValues(Row.of(1L, "Hello"),...) things works if I > change also the query to "INSERT INTO `out` SELECT CAST(f0 AS STRING) ,f1 > FROM ParquetDataset". > If there is still a bug fill a proper JIRA ticket with the exact > description of the problem.. > > Just to conclude this thread there are 2 strange things I found: > > 1) Is LONG really not supported yet? If I use as output table LONG,STRING > I get > Exception in thread "main" java.lang.UnsupportedOperationException: > class org.apache.calcite.sql.SqlIdentifier: LONG > at org.apache.calcite.util.Util.needToImplement(Util.java:967) > > 2) The new planner translates STRING to VARCHAR(2147483647). Is it correct? > > Best, > Flavio > > > On Wed, Jul 15, 2020 at 5:28 AM Jark Wu <imj...@gmail.com> wrote: > >> I think this might be a bug in `tableEnv.fromValues`. >> >> Could you try to remove the DataType parameter, and let the framework >> derive the types? >> >> final Table inputTable = tableEnv.fromValues( >> Row.of(1L, "Hello"), // >> Row.of(2L, "Hello"), // >> Row.of(3L, ""), // >> Row.of(4L, "Ciao")); >> >> Best, >> Jark >> >> >> On Wed, 15 Jul 2020 at 11:19, Leonard Xu <xbjt...@gmail.com> wrote: >> >>> Hi, Flavio >>> >>> I reproduced your issue, and I think it should be a bug. But I’m not >>> sure it comes from Calcite or Flink shaded Calcite, Flink Table Planner >>> module shaded calcite. >>> >>> Maybe Danny can help explain more. >>> >>> CC: Danny >>> >>> Best >>> Leonard Xu >>> >>> 在 2020年7月14日,23:06,Flavio Pompermaier <pomperma...@okkam.it> 写道: >>> >>> If I use >>> >>> final Table inputTable = tableEnv.fromValues( >>> DataTypes.ROW( >>> DataTypes.FIELD("col1", DataTypes.STRING().notNull()), >>> DataTypes.FIELD("col2", DataTypes.STRING().notNull()) >>> ), .. >>> tableEnv.executeSql(// >>> "CREATE TABLE `out` (" + >>> "col1 STRING," + >>> "col2 STRING" + >>> ") WITH (...) >>> >>> the job works as expected but this is wrong IMHO >>> because DataTypes.STRING() = DataTypes.STRING().nullable() by default. >>> If I have DataTypes.STRING().notNull() the type in the CREATE TABLE >>> should be "STRING NOT NULL" . Am I correct? >>> >>> On Tue, Jul 14, 2020 at 4:50 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Sorry, obviously " 'format' = 'parquet'" + is without comment :D >>>> >>>> On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi to all, >>>>> I'm trying to test write to parquet using the following code but I >>>>> have an error: >>>>> >>>>> final TableEnvironment tableEnv = >>>>> DatalinksExecutionEnvironment.getBatchTableEnv(); >>>>> final Table inputTable = tableEnv.fromValues(// >>>>> DataTypes.ROW(// >>>>> DataTypes.FIELD("col1", DataTypes.STRING()), // >>>>> DataTypes.FIELD("col2", DataTypes.STRING())// >>>>> ), // >>>>> Row.of(1L, "Hello"), // >>>>> Row.of(2L, "Hello"), // >>>>> Row.of(3L, ""), // >>>>> Row.of(4L, "Ciao")); >>>>> tableEnv.createTemporaryView("ParquetDataset", inputTable); >>>>> tableEnv.executeSql(// >>>>> "CREATE TABLE `out` (\n" + // >>>>> "col1 STRING,\n" + // >>>>> "col2 STRING\n" + // >>>>> ") WITH (\n" + // >>>>> " 'connector' = 'filesystem',\n" + // >>>>> // " 'format' = 'parquet',\n" + // >>>>> " 'update-mode' = 'append',\n" + // >>>>> " 'path' = 'file://" + TEST_FOLDER + "',\n" + // >>>>> " 'sink.shuffle-by-partition.enable' = 'true'\n" + // >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >>>>> ParquetDataset"); >>>>> >>>>> --------------------------------- >>>>> >>>>> Exception in thread "main" java.lang.AssertionError: Conversion to >>>>> relational algebra failed to preserve datatypes: >>>>> validated type: >>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col1, >>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" col2) NOT NULL >>>>> converted type: >>>>> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col1, >>>>> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL col2) NOT NULL >>>>> rel: >>>>> LogicalProject(col1=[$0], col2=[$1]) >>>>> LogicalUnion(all=[true]) >>>>> LogicalProject(col1=[_UTF-16LE'1':VARCHAR(2147483647) CHARACTER >>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >>>>> "UTF-16LE"]) >>>>> LogicalValues(tuples=[[{ 0 }]]) >>>>> LogicalProject(col1=[_UTF-16LE'2':VARCHAR(2147483647) CHARACTER >>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Hello':VARCHAR(2147483647) CHARACTER SET >>>>> "UTF-16LE"]) >>>>> LogicalValues(tuples=[[{ 0 }]]) >>>>> LogicalProject(col1=[_UTF-16LE'3':VARCHAR(2147483647) CHARACTER >>>>> SET "UTF-16LE"], col2=[_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET >>>>> "UTF-16LE"]) >>>>> LogicalValues(tuples=[[{ 0 }]]) >>>>> LogicalProject(col1=[_UTF-16LE'4':VARCHAR(2147483647) CHARACTER >>>>> SET "UTF-16LE"], col2=[_UTF-16LE'Ciao':VARCHAR(2147483647) CHARACTER SET >>>>> "UTF-16LE"]) >>>>> LogicalValues(tuples=[[{ 0 }]]) >>>>> >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:465) >>>>> at >>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:580) >>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>> <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/> >>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) >>>>> at >>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527) >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) >>>>> at >>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>>> >>>>> >>>>> What is wrong with my code? >>>>> >>>> >>>>