[ https://issues.apache.org/jira/browse/FLINK-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anton Solovev updated FLINK-5592: --------------------------------- Description: {code} @Test def testNestedRowTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("rows", new MockSource) val table: Table = tEnv.scan("rows") val nestedTable: Table = tEnv.scan("rows").select('person) val collect: Seq[Row] = nestedTable.collect() print(collect) } class MockSource extends BatchTableSource[Row] { import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.DataSet override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { val data = List( Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) execEnv.fromCollection(data.asJava, getReturnType) } override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age"))), Array("person") ) } } {code} throws {{java.lang.RuntimeException: Row arity of from does not match serializers}} stacktrace {code} at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) {code} was: {code} @Test def testNestedRowTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("rows", new MockSource) val table: Table = tEnv.scan("rows") val nestedTable: Table = tEnv.scan("rows").select('person) table.printSchema() nestedTable.printSchema() val collect: Seq[Row] = nestedTable.collect() print(collect) } class MockSource extends BatchTableSource[Row] { import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.DataSet override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { val data = List( Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) execEnv.fromCollection(data.asJava, getReturnType) } override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age"))), Array("person") ) } } {code} throws {{java.lang.RuntimeException: Row arity of from does not match serializers}} stacktrace {code} at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) {code} > Wrong number of RowSerializers with nested Rows in Collection mode > ------------------------------------------------------------------ > > Key: FLINK-5592 > URL: https://issues.apache.org/jira/browse/FLINK-5592 > Project: Flink > Issue Type: Bug > Reporter: Anton Solovev > Assignee: Anton Solovev > > {code} > @Test > def testNestedRowTypes(): Unit = { > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > tEnv.registerTableSource("rows", new MockSource) > val table: Table = tEnv.scan("rows") > val nestedTable: Table = tEnv.scan("rows").select('person) > val collect: Seq[Row] = nestedTable.collect() > print(collect) > } > class MockSource extends BatchTableSource[Row] { > import org.apache.flink.api.java.ExecutionEnvironment > import org.apache.flink.api.java.DataSet > override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { > val data = List( > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), > Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) > execEnv.fromCollection(data.asJava, getReturnType) > } > override def getReturnType: TypeInformation[Row] = { > new RowTypeInfo( > Array[TypeInformation[_]]( > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO), > Array("name", "age"))), > Array("person") > ) > } > } > {code} > throws {{java.lang.RuntimeException: Row arity of from does not match > serializers}} > stacktrace > {code} > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) > at > org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) > at > org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) > at > org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) > at > org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)