[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747806#comment-15747806
 ] 

Ivan Mushketyk commented on FLINK-5280:
---------------------------------------

Hi Fabian, Jark,

Thank you for all your comments and for your patience.

Let me try to propose a solution and see if this will work.

I performed a simple test using TableSource, and it seems that we can access 
nested fields. Here is my test *TableSource* that returns POJOs:

https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c

And here is a test code that uses it:

{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, 
config());

tableEnv.registerTableSource("MyTable", new TestBatchTableSource());

Table result = tableEnv
        .sql("SELECT MyTable.amount * MyTable.id, MyTable.name, 
MyTable.childPojo.child.str FROM MyTable WHERE amount < 4");

DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
{code}


And the result of the test seems feesible:


{code}
[0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1, 
17,pojo17,mostChildPojo17, 33,pojo33,mostChildPojo33, 
36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19, 
9,pojo3,mostChildPojo3]
{code}

Since we can access nested fields, it looks like we only need to convert the 
first level of fields into a *Row*. The result *Row* will contain potentially 
nested POJOs, but this does not seem to be an issue. I don't see why do we need 
to go beyond one level of unpacking when we create a *Row*, so will make an 
assumption this is all we need.

To do this, we need to specify how each field of a result *Row* should be 
extracted from *TableSource*'s type T. We can add a new method called: 
*getFieldMapping* that will return an array of strings. A String in position 
*i* will be a field name that should be accessed to get i-th *Row* field value. 
So for example in this comment it can be implemented simply like this:

{code:java}
@Override
public String[] getFieldMapping() {
        return new String[]{"amount", "childPojo", "id", "name"};
}
{code}

Which means that to get value for a 0-th field in the result *Row* we need to 
access field *amount*, to get 1-st field we need to access field "childPojo" 
and so on.

In cases, if we need to convert an indexable type like a tuple or an array we 
do not need this mapping. In this case, we can return *null* or an empty array. 
*Optional* would be a better option, but I think that Flink should work for 
both Java 7 and Java 8.

The only problem with this approach that the *FlinkTable* class accepts an 
array of field indexes that is used to convert values from original type into a 
*Row*:

{code:scala}
abstract class FlinkTable[T](
    val typeInfo: TypeInformation[T],
    val fieldIndexes: Array[Int],
    val fieldNames: Array[String])
  extends AbstractTable {
  ...
}
{code}

So to work around this I propose to change this to:

{code:scala}
abstract class FlinkTable[T](
    val typeInfo: TypeInformation[T],
    val fieldIndexes: Array[Int],
    val fieldMappings: Optional[Array[String]], // <--- New argument
    val fieldNames: Array[String])
  extends AbstractTable {
  ...
}
{code}

We can then use this fieldMappings in *CodeGenerator* to generate a proper 
mapper.

This will technically make it possible to convert *GenericRecord* into a *Row*. 
But since *GenericRecord* implements Avro's interfaces do we need to add a 
dependency on Avro in flink-table to access these fields? Or should we use 
reflection to access these methods? Or should we ignore *GenericRecord* case 
altogether and simply return *Row* from *KafkaTableSource*?

I also wonder why do we need this method in the TableSource interface: 

{code:scala}
  /** Returns the number of fields of the table. */
  def getNumberOfFields: Int
{code}

and I wonder if we can drop it.

What do you think about it? Am I missing something?

> Extend TableSource to support nested data
> -----------------------------------------
>
>                 Key: FLINK-5280
>                 URL: https://issues.apache.org/jira/browse/FLINK-5280
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to