You can not do that in Flink yet, Flink partition column must be mapped to columns from the table schema which you can select from. The syntax is a little different from Hive’s =>
create table table_name ( id int, dtDontQuery string, name string ) partitioned by (date string) In which you can declare the partition column name & type at the same time. Best, Danny Chan 在 2020年7月21日 +0800 PM11:30,Dongwon Kim <eastcirc...@gmail.com>,写道: > Thanks Jark for the update. > > However, getting back to the original question, can I use a nested column > directly for CREATE TABLE PARTITIONED BY like below without declaring an > additional column? > > > CREATE TABLE output > > PARTITIONED BY (`location.transId`) > > WITH ( > > 'connector' = 'filesystem', > > 'path' = 'east-out', > > 'format' = 'json' > > ) LIKE navi (EXCLUDING ALL) > > I tried (`location`.transId) as well but it fails with an exception: > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > > SQL parse failed. Encountered "." at line 3, column 27. > > Was expecting one of: > > ")" ... > > "," ... > > > > at > > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > > at > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28) > > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." > > at line 3, column 27. > > Was expecting one of: > > ")" ... > > "," ... > > > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > > at > > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) > > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) > > at > > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > > ... 3 more > > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." > > at line 3, column 27. > > Was expecting one of: > > ")" ... > > "," ... > > > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > > at > > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > > ... 5 more > > Best, > > Dongwon > > > On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <imj...@gmail.com> wrote: > > > Hi Dongwon, > > > > > > I think this is a bug in the Filesystem connector which doesn't exclude > > > the computed columns when building the TableSource. > > > I created an issue [1] to track this problem. > > > > > > Best, > > > Jark > > > > > > [1]: https://issues.apache.org/jira/browse/FLINK-18665 > > > > > > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote: > > > > > Hi Danny, > > > > > > > > > > > Which version did you use > > > > > I use Flink 1.11.0. > > > > > > > > > > > what SQL context throws the error ? > > > > > I think the declaration itself is not a problem. > > > > > The exception occurs when I tried to execute the following which I > > > > > didn't show you in the previous email: > > > > > > tEnv.sqlQuery("SELECT type, location FROM > > > > > > navi").executeInsert("output") > > > > > > > > > > Thanks, > > > > > > > > > > Dongwon > > > > > > > > > > > On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> > > > > > > wrote: > > > > > > > Hi, I execute the sql below > > > > > > > > > > > > > > """ > > > > > > > |create table navi ( > > > > > > > | a STRING, > > > > > > > | location ROW<lastUpdateTime BIGINT, transId STRING> > > > > > > > |) with ( > > > > > > > | 'connector' = 'filesystem', > > > > > > > | 'path' = 'east-out', > > > > > > > | 'format' = 'json' > > > > > > > |) > > > > > > > |""".stripMargin > > > > > > > tableEnv.executeSql(sql0) > > > > > > > val sql = > > > > > > > """ > > > > > > > |CREATE TABLE output ( > > > > > > > | `partition` AS location.transId > > > > > > > |) PARTITIONED BY (`partition`) > > > > > > > |WITH ( > > > > > > > | 'connector' = 'filesystem', > > > > > > > | 'path' = 'east-out', > > > > > > > | 'format' = 'json' > > > > > > > |) LIKE navi (EXCLUDING ALL) > > > > > > > |""".stripMargin > > > > > > > tableEnv.executeSql(sql) > > > > > > > > > > > > > > In master branch, both are correct, can you share you stack trace > > > > > > > detail ? Which version did you use and what SQL context throws > > > > > > > the error ? > > > > > > > > > > > > > > Best, > > > > > > > Danny Chan > > > > > > > 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道: > > > > > > > > Hi, > > > > > > > > > > > > > > > > I want to create subdirectories named after values of a nested > > > > > > > > column, location.transId. > > > > > > > > > > > > > > > > This is my first attempt: > > > > > > > > > CREATE TABLE output > > > > > > > > > PARTITIONED BY (`location.transId`) > > > > > > > > > WITH ( > > > > > > > > > 'connector' = 'filesystem', > > > > > > > > > 'path' = 'east-out', > > > > > > > > > 'format' = 'json' > > > > > > > > > ) LIKE navi (EXCLUDING ALL) > > > > > > > > > > > > > > > > It fails with the following errors: > > > > > > > > > Exception in thread "main" > > > > > > > > > org.apache.flink.table.api.ValidationException: Partition > > > > > > > > > column 'location.transId' not defined in the table schema. > > > > > > > > > Available columns: ['type', 'location'] > > > > > > > > > at > > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164) > > > > > > > > > at > > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130) > > > > > > > > > at > > > > > > > > > org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76) > > > > > > > > > at > > > > > > > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190) > > > > > > > > > at > > > > > > > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > > > > > > > > > at > > > > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > > > > > > > > > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28) > > > > > > > > > > > > > > > > As It seems like nested columns are not recognized as a > > > > > > > > eligible column for PARTITIONED BY, I tried the following: > > > > > > > > > CREATE TABLE output ( > > > > > > > > > `partition` AS location.transId > > > > > > > > > ) PARTITIONED BY (`partition`) > > > > > > > > > WITH ( > > > > > > > > > 'connector' = 'filesystem', > > > > > > > > > 'path' = 'east-out', > > > > > > > > > 'format' = 'json' > > > > > > > > > ) LIKE navi (EXCLUDING ALL) > > > > > > > > It also fails: > > > > > > > > > Exception in thread "main" > > > > > > > > > org.apache.flink.table.api.ValidationException: The field > > > > > > > > > count of logical schema of the table does not match with the > > > > > > > > > field count of physical schema > > > > > > > > . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, > > > > > > > > `transId` STRING>] > > > > > > > > The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, > > > > > > > > `transId` STRING>,STRING]. > > > > > > > > > > > > > > > > Thanks in advance, > > > > > > > > > > > > > > > > Dongwon