[ https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873903#comment-16873903 ]
aloyszhang commented on FLINK-12848: ------------------------------------ Hi [~jark] I made a mistake for flink-1.9 test because I just pay attention to the DEBUG information of FlinkTypeFactory#buildLogicalRowType. Actually, there is no problem with flink-1.9. Let's just see test flink-1.7 under. Code need in my test display as follow: SimpleProcessionTimeSource.java {code:java} public class SimpleProcessionTimeSource implements StreamTableSource<Row>, DefinedProctimeAttribute { public static final String PROCTIME_NAME = "timestamp"; private String[] fieldNames; private TypeInformation<?>[] typeInformations; private RowTypeInfo typeInfo; private TableSchema tableSchema; public SimpleProcessionTimeSource(String[] fieldNames, TypeInformation<?>[] typeInformations) { this.fieldNames = fieldNames; this.typeInformations = typeInformations; this.typeInfo = new RowTypeInfo(typeInformations, fieldNames); String [] schemaFiled = new String [fieldNames.length + 1]; TypeInformation<?> [] schemaTypes = new TypeInformation[typeInformations.length + 1]; System.arraycopy(fieldNames,0, schemaFiled, 0 ,fieldNames.length); System.arraycopy(typeInformations,0, schemaTypes, 0 ,typeInformations.length); schemaFiled[fieldNames.length] = PROCTIME_NAME; schemaTypes[typeInformations.length] = Types.SQL_TIMESTAMP; this.tableSchema = new TableSchema(schemaFiled, schemaTypes); } @Override public DataStream<Row> getDataStream( StreamExecutionEnvironment execEnv) { DataStreamSource<Row> ds = execEnv.addSource(new SimpleSourceFunction(), "pbSource", typeInfo); return ds; } @Override public TypeInformation getReturnType() { return typeInfo; } @Override public TableSchema getTableSchema() { return tableSchema; } @Override public String explainSource() { return ""; } @Nullable @Override public String getProctimeAttribute() { return PROCTIME_NAME; } class SimpleSourceFunction implements SourceFunction<Row> {{ } @Override public void run(SourceContext<Row> sourceContext) throws Exception { } @Override public void cancel() { } } }{code} Test code: {code:java} @Test public void test001(){ String [] fields = new String []{"first", "second"}; TypeInformation<?>[] types = new TypeInformation[]{ Types.ROW_NAMED(new String[]{"first001"}, Types.INT), Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; //build flink program StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment env = StreamTableEnvironment.getTableEnvironment(execEnv); SimpleProcessionTimeSource streamTableSource = new SimpleProcessionTimeSource(fields, types); env.registerTableSource("testSource", streamTableSource); Table sourceTable = env.scan("testSource"); System.out.println("Source table schema : "); sourceTable.printSchema(); Table table = sourceTable.select("first.get('first001'),second.get('second002')"); table.printSchema(); try { execEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } {code} Test result : {code:java} Source table schema : root |-- first: Row(first001: Integer) |-- second: Row(first001: Integer) |-- timestamp: TimeIndicatorTypeInfo(proctime) org.apache.flink.table.api.ValidationException: Expression 'second.get(second002) failed on input check: Field name 'second002' could not be found. at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:97) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84) at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72) at org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.flink.table.plan.TreeNode.childrenTransform$1(TreeNode.scala:66) at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:70) at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:132) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:145) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:144) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:150) at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84) at org.apache.flink.table.plan.logical.Project.validate(operators.scala:73) at org.apache.flink.table.api.Table.select(table.scala:138) at org.apache.flink.table.api.Table.select(table.scala:156) at org.apache.flink.streaming.test.ScanTypeTest.test001(ScanTypeTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) {code} This problem is caused by FlinkTypeFactory#buildLogicalRowType which calls FlinkTypeFactory#createTypeFromTypeInfo. In progress of FlinkTypeFactory#createTypeFromTypeInfo, it updates `seenTypes` which type is `mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]` with a Tuple2(TypeInfomation[_], boolean) as key. If the row has nested type of RowTypeInfo as in test code . After process the first type `Types.ROW_NAMED(new String[]\{"first001"}, Types.INT)`, `seenTypes` has an entry with key Tuple2(Type.Int, true). Then process the second type `Types.ROW_NAMED(new String[]\{"second002"}, Types.INT)`, it will find key Tuple2(Type.Int, true) already in `seenTypes` and return RelDataType of the first type. This will generate RelDataType with wrong fieldname for the second type. > Method equals() in RowTypeInfo should consider fieldsNames > ---------------------------------------------------------- > > Key: FLINK-12848 > URL: https://issues.apache.org/jira/browse/FLINK-12848 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Affects Versions: 1.7.2 > Reporter: aloyszhang > Assignee: aloyszhang > Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > Since the `RowTypeInfo#equals()` does not consider the fieldNames , when > process data with RowTypeInfo type there may comes an error of the field > name. > {code:java} > String [] fields = new String []{"first", "second"}; > TypeInformation<?>[] types = new TypeInformation[]{ > Types.ROW_NAMED(new String[]{"first001"}, Types.INT), > Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment env = > StreamTableEnvironment.getTableEnvironment(execEnv); > SimpleProcessionTimeSource streamTableSource = new > SimpleProcessionTimeSource(fields, types); > env.registerTableSource("testSource", streamTableSource); > Table sourceTable = env.scan("testSource"); > System.out.println("Source table schema : "); > sourceTable.printSchema(); > {code} > The table shcema will be > {code:java} > Source table schema : > root > |-- first: Row(first001: Integer) > |-- second: Row(first001: Integer) > |-- timestamp: TimeIndicatorTypeInfo(proctime) > {code} > the second field has the same name with the first field. > So, we should consider the fieldnames in RowTypeInfo#equals() > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)