Hi Dongwon,

I think this is a bug in the Filesystem connector which doesn't exclude the
computed columns when building the TableSource.
I created an issue [1] to track this problem.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-18665

On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Danny,
>
>  Which version did you use
>
> I use Flink 1.11.0.
>
>
>>  what SQL context throws the error ?
>
> I think the declaration itself is not a problem.
> The exception occurs when I tried to execute the following which I didn't
> show you in the previous email:
>
>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>
>
> Thanks,
>
> Dongwon
>
> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> wrote:
>
>> Hi, I execute the sql below
>>
>>   """
>>     |create table navi (
>>     |  a STRING,
>>     |  location ROW<lastUpdateTime BIGINT, transId STRING>
>>     |) with (
>>     |  'connector' = 'filesystem',
>>     |  'path' = 'east-out',
>>     |  'format' = 'json'
>>     |)
>>     |""".stripMargin
>> tableEnv.executeSql(sql0)
>> val sql =
>>   """
>>     |CREATE TABLE output (
>>     |  `partition` AS location.transId
>>     |) PARTITIONED BY (`partition`)
>>     |WITH (
>>     |  'connector' = 'filesystem',
>>     |  'path' = 'east-out',
>>     |  'format' = 'json'
>>     |) LIKE navi (EXCLUDING ALL)
>>     |""".stripMargin
>> tableEnv.executeSql(sql)
>>
>>
>> In master branch, both are correct, can you share you stack trace detail
>> ? Which version did you use and what SQL context throws the error ?
>>
>> Best,
>> Danny Chan
>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道:
>>
>> Hi,
>>
>> I want to create subdirectories named after values of a nested column,
>> location.transId.
>>
>> This is my first attempt:
>>
>>> CREATE TABLE output
>>> PARTITIONED BY (`location.transId`)
>>> WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'east-out',
>>>   'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>>
>> It fails with the following errors:
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Partition column
>>> 'location.transId' not defined in the table schema. Available columns:
>>> ['type', 'location']
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>>
>>
>> As It seems like nested columns are not recognized as a eligible column
>> for PARTITIONED BY, I tried the following:
>>
>>> CREATE TABLE output (
>>>   `partition` AS location.transId
>>> ) PARTITIONED BY (`partition`)
>>> WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'east-out',
>>>   'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>> It also fails:
>>
>>>  Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: The field count of logical
>>> schema of the table does not match with the field count of physical schema
>>
>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>]
>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>,STRING].
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>>

Reply via email to