YuvalItzchakov edited a comment on issue #9780: [FLINK-14042] [flink-table-planner] Fix TemporalTable row schema always inferred as nullable URL: https://github.com/apache/flink/pull/9780#issuecomment-544151621 @twalthr OK, once I wrote the test I figured out my fix here won't solve the problem. After doing some more debugging, I've come to the following findings: Given the following test: ```scala val util = scalaStreamTestUtil() val sourceTable = util.addTableSource[(Timestamp, Long, String)]("MyTable", 'f1, 'f2, 'f3) val temporal = util.tableEnv.sqlQuery("SELECT f1, f2, COLLECT(DISTINCT f3) f3s FROM MyTable GROUP BY f1, f2") val temporalFunc = temporal.createTemporalTableFunction('f1, 'f2) util.tableEnv.registerFunction("f", temporalFunc) val queryTable = util.tableEnv.sqlQuery("select f1, f2, b from MyTable, LATERAL TABLE(f(f1)) AS T(a, b, cs)") util.verifyPlan(queryTable) ``` If we first look at the generated table schema for the underlying table by the SQL query for the temporal table, we see: ``` root |-- f1: TIMESTAMP(3) |-- f2: BIGINT |-- f3s: MULTISET<STRING> NOT NULL ``` When `FlinkPlanner` validates the SQL query it reaches the part where it needs to validate the `TemporalTableFunction` I've defined called `f`. It calls [FunctionCatalogOperatorTable.lookupOperatorOverloads](https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java#L57) to search for the method and once it finds it, it needs to convert it to a standard `SqlFunction`. In order to do that, it needs to convert the `TypeInformation[T]` of the `TemporalTableFunction`'s result type to the new `DataType` API. Problem is, `TypeInformation[T]` doesn't carry information about the nullability of the field, thus when the conversion to `Multiset[T]` happens, [it ends up calling the default constructor which sets `nullable = true` by default](https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/MultisetType.java#L60), which ends up blowing up at runtime because the `TableSchema` expected a NOT NULL field. I'm not entirely sure how we can get around this issue. **EDIT**: OK, it seems like the `TableFunctionDefinition` for the temporal table already carries the `DataType` information which is visible via: `((TemporalTableFunctionImpl) functionDefinition.tableFunction).getUnderlyingHistoryTable().getTableSchema().getFieldDataTypes()`, which we can use in order to avoid the data losing conversion from `TypeInformation[T]`. WDYT? **EDIT 2:** Turns out this is not enough, since `DeferredTypeFlinkTableFunction.getExternalResultType()` will attempt to call `tableFunction.getResult`, which has the `TypeInformation[T]` and thus will result in a nullable type as well. **EDIT 3:** `SqlTypeFactoryImpl.createMultisetType` defaults to creating a non null multiset: ```java public RelDataType createMultisetType( RelDataType type, long maxCardinality) { assert maxCardinality == -1; RelDataType newType = new MultisetSqlType(type, false); return canonize(newType); } ``` This is where the types converge, and it seems like this will happen for any complex data type?
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services