[ https://issues.apache.org/jira/browse/FLINK-14042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16960124#comment-16960124 ]
Yuval Itzchakov commented on FLINK-14042: ----------------------------------------- >From my tests, the problem exists in Blink as well > Different RelDataTypes generated for same TemporalTableFunction > --------------------------------------------------------------- > > Key: FLINK-14042 > URL: https://issues.apache.org/jira/browse/FLINK-14042 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.8.1 > Reporter: Yuval Itzchakov > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Given the following table called "foo": > {code:java} > SELECT event_time, b, c > FROM X > WHERE event_time >= <START_TIME> AND event_time <END_TIME>{code} > And the following temporal table definition defined on "foo": > {code:java} > SELECT event_time, b, COLLECT(c) c > FROM foo > GROUP BY event_time, b{code} > I get the following exception: > {code:java} > Exception in thread "main" java.lang.AssertionError: Cannot add expression of > different type to set:Exception in thread "main" java.lang.AssertionError: > Cannot add expression of different type to set:set type is > RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET > "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET > "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL > event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE > "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" > COLLATE "ISO-8859-1$en_US$primary" MULTISET c) NOT NULLexpression type is > RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET > "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET > "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL > event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE > "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" > COLLATE "ISO-8859-1$en_US$primary" MULTISET NOT NULL c) NOT NULL > set is > rel#17:LogicalCorrelate.NONE(left=HepRelVertex#15,right=HepRelVertex#16,correlation=$cor0,joinType=inner,requiredColumns={0})expression > is LogicalTemporalTableJoin#23 at > org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:380) > at org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:57) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) > at > org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:111) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415) at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:280) > at > org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211) at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198) at > org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360) > at > org.apache.flink.table.api.TableEnvironment.runHepPlannerSimultaneously(TableEnvironment.scala:344) > at > org.apache.flink.table.api.TableEnvironment.optimizeExpandPlan(TableEnvironment.scala:270) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:809) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351) > at > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879) > at org.apache.flink.table.api.Table.insertInto(table.scala:1148) > {code} > Digging into the table planner, it appears that when the temporal table is > being registered, it goes through `FlinkTypeFactory.buildLogicalRowType`, > which uses the following code: > {code:java} > def buildLogicalRowType( > fieldNames: Seq[String], > fieldTypes: Seq[TypeInformation[_]]) > : RelDataType = { > val logicalRowTypeBuilder = builder > val fields = fieldNames.zip(fieldTypes) > fields.foreach(f => { > // time indicators are not nullable > val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2) > logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable)) > }) > logicalRowTypeBuilder.build > } > {code} > We can see here that `nullable` is derived from `isTimeIndicatorType` method. > On the other hand, when registering the table that uses the > TemporalTableFunction in the query, this resolves through > `FlinkTableFunctionImpl.getRowType`, which doesn't look up the time indicator > flags at all and sets all fields to be nullable: > {code:java} > override def getRowType(typeFactory: RelDataTypeFactory, > arguments: util.List[AnyRef]): RelDataType = { > val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] > val builder = flinkTypeFactory.builder > fieldNames > .zip(fieldTypes) > .foreach { f => > builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, > isNullable = true)) > } > builder.build > } > {code} > This creates a diff between the original schema registered and the inferred > schema for usage, which results in the above exception. > I haven't tried this for other complex types, but it seems like this should > happen for any advanced type which wasn't nullable to begin with. > -- This message was sent by Atlassian Jira (v8.3.4#803005)