Hi, I execute the sql below

"""
   |create table navi (
   |  a STRING,
   |  location ROW<lastUpdateTime BIGINT, transId STRING>
   |) with (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |)
   |""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
   |CREATE TABLE output (
   |  `partition` AS location.transId
   |) PARTITIONED BY (`partition`)
   |WITH (
   |  'connector' = 'filesystem',
   |  'path' = 'east-out',
   |  'format' = 'json'
   |) LIKE navi (EXCLUDING ALL)
   |""".stripMargin
tableEnv.executeSql(sql)

In master branch, both are correct, can you share you stack trace detail ? 
Which version did you use and what SQL context throws the error ?

Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道:
> Hi,
>
> I want to create subdirectories named after values of a nested column, 
> location.transId.
>
> This is my first attempt:
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> It fails with the following errors:
> > Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > Partition column 'location.transId' not defined in the table schema. 
> > Available columns: ['type', 'location']
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > at 
> > org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > at 
> > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>
> As It seems like nested columns are not recognized as a eligible column for 
> PARTITIONED BY, I tried the following:
> > CREATE TABLE output (
> >   `partition` AS location.transId
> > ) PARTITIONED BY (`partition`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
> It also fails:
> >  Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> > The field count of logical schema of the table does not match with the 
> > field count of physical schema
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` 
> STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon

Reply via email to