[ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747806#comment-15747806 ]
Ivan Mushketyk commented on FLINK-5280: --------------------------------------- Hi Fabian, Jark, Thank you for all your comments and for your patience. Let me try to propose a solution and see if this will work. I performed a simple test using TableSource, and it seems that we can access nested fields. Here is my test *TableSource* that returns POJOs: https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c And here is a test code that uses it: {code:java} ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); tableEnv.registerTableSource("MyTable", new TestBatchTableSource()); Table result = tableEnv .sql("SELECT MyTable.amount * MyTable.id, MyTable.name, MyTable.childPojo.child.str FROM MyTable WHERE amount < 4"); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); {code} And the result of the test seems feesible: {code} [0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1, 17,pojo17,mostChildPojo17, 33,pojo33,mostChildPojo33, 36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19, 9,pojo3,mostChildPojo3] {code} Since we can access nested fields, it looks like we only need to convert the first level of fields into a *Row*. The result *Row* will contain potentially nested POJOs, but this does not seem to be an issue. I don't see why do we need to go beyond one level of unpacking when we create a *Row*, so will make an assumption this is all we need. To do this, we need to specify how each field of a result *Row* should be extracted from *TableSource*'s type T. We can add a new method called: *getFieldMapping* that will return an array of strings. A String in position *i* will be a field name that should be accessed to get i-th *Row* field value. So for example in this comment it can be implemented simply like this: {code:java} @Override public String[] getFieldMapping() { return new String[]{"amount", "childPojo", "id", "name"}; } {code} Which means that to get value for a 0-th field in the result *Row* we need to access field *amount*, to get 1-st field we need to access field "childPojo" and so on. In cases, if we need to convert an indexable type like a tuple or an array we do not need this mapping. In this case, we can return *null* or an empty array. *Optional* would be a better option, but I think that Flink should work for both Java 7 and Java 8. The only problem with this approach that the *FlinkTable* class accepts an array of field indexes that is used to convert values from original type into a *Row*: {code:scala} abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], val fieldNames: Array[String]) extends AbstractTable { ... } {code} So to work around this I propose to change this to: {code:scala} abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], val fieldIndexes: Array[Int], val fieldMappings: Optional[Array[String]], // <--- New argument val fieldNames: Array[String]) extends AbstractTable { ... } {code} We can then use this fieldMappings in *CodeGenerator* to generate a proper mapper. This will technically make it possible to convert *GenericRecord* into a *Row*. But since *GenericRecord* implements Avro's interfaces do we need to add a dependency on Avro in flink-table to access these fields? Or should we use reflection to access these methods? Or should we ignore *GenericRecord* case altogether and simply return *Row* from *KafkaTableSource*? I also wonder why do we need this method in the TableSource interface: {code:scala} /** Returns the number of fields of the table. */ def getNumberOfFields: Int {code} and I wonder if we can drop it. What do you think about it? Am I missing something? > Extend TableSource to support nested data > ----------------------------------------- > > Key: FLINK-5280 > URL: https://issues.apache.org/jira/browse/FLINK-5280 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Affects Versions: 1.2.0 > Reporter: Fabian Hueske > Assignee: Ivan Mushketyk > > The {{TableSource}} interface does currently only support the definition of > flat rows. > However, there are several storage formats for nested data that should be > supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can > also natively handle nested rows. > The {{TableSource}} interface and the code to register table sources in > Calcite's schema need to be extended to support nested data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)