You can not do that in Flink yet, Flink partition column must be mapped to 
columns from the table schema which you can select from. The syntax is a little 
different from Hive’s =>

create table table_name (
  id                int,
  dtDontQuery       string,
  name              string
)
partitioned by (date string)

In which you can declare the partition column name & type at the same time.

Best,
Danny Chan
在 2020年7月21日 +0800 PM11:30,Dongwon Kim <eastcirc...@gmail.com>,写道:
> Thanks Jark for the update.
>
> However, getting back to the original question, can I use a nested column 
> directly for CREATE TABLE PARTITIONED BY like below without declaring an 
> additional column?
>
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> >   'connector' = 'filesystem',
> >   'path' = 'east-out',
> >   'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> I tried (`location`.transId) as well but it fails with an exception:
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException: 
> > SQL parse failed. Encountered "." at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> > at 
> > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> > at 
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> > at 
> > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> > at 
> > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> > ... 3 more
> > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." 
> > at line 3, column 27.
> > Was expecting one of:
> >     ")" ...
> >     "," ...
> >
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> > at 
> > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> > ... 5 more
>
> Best,
>
> Dongwon
>
> > On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <imj...@gmail.com> wrote:
> > > Hi Dongwon,
> > >
> > > I think this is a bug in the Filesystem connector which doesn't exclude 
> > > the computed columns when building the TableSource.
> > > I created an issue [1] to track this problem.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://issues.apache.org/jira/browse/FLINK-18665
> > >
> > > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote:
> > > > > Hi Danny,
> > > > >
> > > > > >  Which version did you use
> > > > > I use Flink 1.11.0.
> > > > >
> > > > > >  what SQL context throws the error ?
> > > > > I think the declaration itself is not a problem.
> > > > > The exception occurs when I tried to execute the following which I 
> > > > > didn't show you in the previous email:
> > > > > > tEnv.sqlQuery("SELECT type, location FROM 
> > > > > > navi").executeInsert("output")
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Dongwon
> > > > >
> > > > > > On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> 
> > > > > > wrote:
> > > > > > > Hi, I execute the sql below
> > > > > > >
> > > > > > > """
> > > > > > >    |create table navi (
> > > > > > >    |  a STRING,
> > > > > > >    |  location ROW<lastUpdateTime BIGINT, transId STRING>
> > > > > > >    |) with (
> > > > > > >    |  'connector' = 'filesystem',
> > > > > > >    |  'path' = 'east-out',
> > > > > > >    |  'format' = 'json'
> > > > > > >    |)
> > > > > > >    |""".stripMargin
> > > > > > > tableEnv.executeSql(sql0)
> > > > > > > val sql =
> > > > > > > """
> > > > > > >    |CREATE TABLE output (
> > > > > > >    |  `partition` AS location.transId
> > > > > > >    |) PARTITIONED BY (`partition`)
> > > > > > >    |WITH (
> > > > > > >    |  'connector' = 'filesystem',
> > > > > > >    |  'path' = 'east-out',
> > > > > > >    |  'format' = 'json'
> > > > > > >    |) LIKE navi (EXCLUDING ALL)
> > > > > > >    |""".stripMargin
> > > > > > > tableEnv.executeSql(sql)
> > > > > > >
> > > > > > > In master branch, both are correct, can you share you stack trace 
> > > > > > > detail ? Which version did you use and what SQL context throws 
> > > > > > > the error ?
> > > > > > >
> > > > > > > Best,
> > > > > > > Danny Chan
> > > > > > > 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I want to create subdirectories named after values of a nested 
> > > > > > > > column, location.transId.
> > > > > > > >
> > > > > > > > This is my first attempt:
> > > > > > > > > CREATE TABLE output
> > > > > > > > > PARTITIONED BY (`location.transId`)
> > > > > > > > > WITH (
> > > > > > > > >   'connector' = 'filesystem',
> > > > > > > > >   'path' = 'east-out',
> > > > > > > > >   'format' = 'json'
> > > > > > > > > ) LIKE navi (EXCLUDING ALL)
> > > > > > > >
> > > > > > > > It fails with the following errors:
> > > > > > > > > Exception in thread "main" 
> > > > > > > > > org.apache.flink.table.api.ValidationException: Partition 
> > > > > > > > > column 'location.transId' not defined in the table schema. 
> > > > > > > > > Available columns: ['type', 'location']
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > > > > > > > > at 
> > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > > > > > > > > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > > > > > > >
> > > > > > > > As It seems like nested columns are not recognized as a 
> > > > > > > > eligible column for PARTITIONED BY, I tried the following:
> > > > > > > > > CREATE TABLE output (
> > > > > > > > >   `partition` AS location.transId
> > > > > > > > > ) PARTITIONED BY (`partition`)
> > > > > > > > > WITH (
> > > > > > > > >   'connector' = 'filesystem',
> > > > > > > > >   'path' = 'east-out',
> > > > > > > > >   'format' = 'json'
> > > > > > > > > ) LIKE navi (EXCLUDING ALL)
> > > > > > > > It also fails:
> > > > > > > > >  Exception in thread "main" 
> > > > > > > > > org.apache.flink.table.api.ValidationException: The field 
> > > > > > > > > count of logical schema of the table does not match with the 
> > > > > > > > > field count of physical schema
> > > > > > > > . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, 
> > > > > > > > `transId` STRING>]
> > > > > > > > The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, 
> > > > > > > > `transId` STRING>,STRING].
> > > > > > > >
> > > > > > > > Thanks in advance,
> > > > > > > >
> > > > > > > > Dongwon

Reply via email to