[ 
https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873903#comment-16873903
 ] 

aloyszhang commented on FLINK-12848:
------------------------------------

Hi [~jark]

I made a mistake for flink-1.9 test because I just pay attention to the DEBUG 
information of FlinkTypeFactory#buildLogicalRowType.
Actually, there is no problem with flink-1.9.

Let's just see test flink-1.7 under.

Code need in my test display as follow:

SimpleProcessionTimeSource.java

 
{code:java}
public class SimpleProcessionTimeSource
implements StreamTableSource<Row>, DefinedProctimeAttribute {
public static final String PROCTIME_NAME = "timestamp";

private String[] fieldNames;
private TypeInformation<?>[] typeInformations;
private RowTypeInfo typeInfo;
private TableSchema tableSchema;

public SimpleProcessionTimeSource(String[] fieldNames,
TypeInformation<?>[] typeInformations) {
this.fieldNames = fieldNames;
this.typeInformations = typeInformations;
this.typeInfo = new RowTypeInfo(typeInformations, fieldNames);
String [] schemaFiled = new String [fieldNames.length + 1];
TypeInformation<?> [] schemaTypes = new TypeInformation[typeInformations.length 
+ 1];
System.arraycopy(fieldNames,0, schemaFiled, 0 ,fieldNames.length);
System.arraycopy(typeInformations,0, schemaTypes, 0 ,typeInformations.length);
schemaFiled[fieldNames.length] = PROCTIME_NAME;
schemaTypes[typeInformations.length] = Types.SQL_TIMESTAMP;
this.tableSchema = new TableSchema(schemaFiled, schemaTypes);
}

@Override public DataStream<Row> getDataStream(
StreamExecutionEnvironment execEnv) {

DataStreamSource<Row> ds = execEnv.addSource(new SimpleSourceFunction(), 
"pbSource", typeInfo);
return ds;
}

@Override public TypeInformation getReturnType() {
return typeInfo;
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


@Override public String explainSource() {
return "";
}

@Nullable
@Override
public String getProctimeAttribute() {
return PROCTIME_NAME;
}



class SimpleSourceFunction implements SourceFunction<Row> {{

}

@Override
public void run(SourceContext<Row> sourceContext) throws Exception {

}

@Override
public void cancel() {

}
}
}{code}
 

 

Test code:

 
{code:java}
@Test
public void test001(){
String [] fields = new String []{"first", "second"};
TypeInformation<?>[] types = new TypeInformation[]{
Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
Types.ROW_NAMED(new String[]{"second002"}, Types.INT)
};
//build flink program
StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment env = 
StreamTableEnvironment.getTableEnvironment(execEnv);
SimpleProcessionTimeSource streamTableSource = new 
SimpleProcessionTimeSource(fields, types);
env.registerTableSource("testSource", streamTableSource);
Table sourceTable = env.scan("testSource");
System.out.println("Source table schema : ");
sourceTable.printSchema();

Table table = 
sourceTable.select("first.get('first001'),second.get('second002')");
table.printSchema();

try {
execEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
{code}
Test result : 
{code:java}
Source table schema :
root
|-- first: Row(first001: Integer)
|-- second: Row(first001: Integer)
|-- timestamp: TimeIndicatorTypeInfo(proctime)

org.apache.flink.table.api.ValidationException: Expression 
'second.get(second002) failed on input check: Field name 'second002' could not 
be found.

at 
org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:156)
at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:97)
at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:84)
at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
at org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at org.apache.flink.table.plan.TreeNode.childrenTransform$1(TreeNode.scala:66)
at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:70)
at 
org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:132)
at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7$$anonfun$apply$1.apply(LogicalNode.scala:145)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:144)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at 
org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:150)
at 
org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:84)
at org.apache.flink.table.plan.logical.Project.validate(operators.scala:73)
at org.apache.flink.table.api.Table.select(table.scala:138)
at org.apache.flink.table.api.Table.select(table.scala:156)
at org.apache.flink.streaming.test.ScanTypeTest.test001(ScanTypeTest.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
{code}
 

This problem is caused by FlinkTypeFactory#buildLogicalRowType which calls 
FlinkTypeFactory#createTypeFromTypeInfo.

In progress of FlinkTypeFactory#createTypeFromTypeInfo, it updates `seenTypes` 
which type is `mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]` 
with a Tuple2(TypeInfomation[_], boolean) as key.

If  the row has nested type of RowTypeInfo as in test code . 

After process the first type `Types.ROW_NAMED(new String[]\{"first001"}, 
Types.INT)`, `seenTypes` has an entry with key Tuple2(Type.Int, true).

Then process the second type `Types.ROW_NAMED(new String[]\{"second002"}, 
Types.INT)`, it will find key Tuple2(Type.Int, true) already in `seenTypes` and 
return RelDataType of the first type. This will generate RelDataType with wrong 
fieldname for the second type.

 

 

> Method equals() in RowTypeInfo should consider fieldsNames
> ----------------------------------------------------------
>
>                 Key: FLINK-12848
>                 URL: https://issues.apache.org/jira/browse/FLINK-12848
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.7.2
>            Reporter: aloyszhang
>            Assignee: aloyszhang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Since the `RowTypeInfo#equals()` does not consider the fieldNames , when 
> process data with RowTypeInfo type there may comes an error of the field 
> name.  
> {code:java}
> String [] fields = new String []{"first", "second"};
> TypeInformation<?>[] types = new TypeInformation[]{
> Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
> Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; 
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment env = 
> StreamTableEnvironment.getTableEnvironment(execEnv);
> SimpleProcessionTimeSource streamTableSource = new 
> SimpleProcessionTimeSource(fields, types);
> env.registerTableSource("testSource", streamTableSource);
> Table sourceTable = env.scan("testSource");
> System.out.println("Source table schema : ");
> sourceTable.printSchema();
> {code}
> The table shcema will be 
> {code:java}
> Source table schema : 
> root 
> |-- first: Row(first001: Integer) 
> |-- second: Row(first001: Integer) 
> |-- timestamp: TimeIndicatorTypeInfo(proctime)
> {code}
> the second field has the same name with the first field.
> So, we should consider the fieldnames in RowTypeInfo#equals()
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to