Hi Dongwon, I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource. I created an issue [1] to track this problem.
Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-18665 On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Danny, > > Which version did you use > > I use Flink 1.11.0. > > >> what SQL context throws the error ? > > I think the declaration itself is not a problem. > The exception occurs when I tried to execute the following which I didn't > show you in the previous email: > >> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output") > > > Thanks, > > Dongwon > > On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> wrote: > >> Hi, I execute the sql below >> >> """ >> |create table navi ( >> | a STRING, >> | location ROW<lastUpdateTime BIGINT, transId STRING> >> |) with ( >> | 'connector' = 'filesystem', >> | 'path' = 'east-out', >> | 'format' = 'json' >> |) >> |""".stripMargin >> tableEnv.executeSql(sql0) >> val sql = >> """ >> |CREATE TABLE output ( >> | `partition` AS location.transId >> |) PARTITIONED BY (`partition`) >> |WITH ( >> | 'connector' = 'filesystem', >> | 'path' = 'east-out', >> | 'format' = 'json' >> |) LIKE navi (EXCLUDING ALL) >> |""".stripMargin >> tableEnv.executeSql(sql) >> >> >> In master branch, both are correct, can you share you stack trace detail >> ? Which version did you use and what SQL context throws the error ? >> >> Best, >> Danny Chan >> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道: >> >> Hi, >> >> I want to create subdirectories named after values of a nested column, >> location.transId. >> >> This is my first attempt: >> >>> CREATE TABLE output >>> PARTITIONED BY (`location.transId`) >>> WITH ( >>> 'connector' = 'filesystem', >>> 'path' = 'east-out', >>> 'format' = 'json' >>> ) LIKE navi (EXCLUDING ALL) >>> >> >> It fails with the following errors: >> >>> Exception in thread "main" >>> org.apache.flink.table.api.ValidationException: Partition column >>> 'location.transId' not defined in the table schema. Available columns: >>> ['type', 'location'] >>> at >>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164) >>> at >>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130) >>> at >>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28) >>> >> >> As It seems like nested columns are not recognized as a eligible column >> for PARTITIONED BY, I tried the following: >> >>> CREATE TABLE output ( >>> `partition` AS location.transId >>> ) PARTITIONED BY (`partition`) >>> WITH ( >>> 'connector' = 'filesystem', >>> 'path' = 'east-out', >>> 'format' = 'json' >>> ) LIKE navi (EXCLUDING ALL) >>> >> It also fails: >> >>> Exception in thread "main" >>> org.apache.flink.table.api.ValidationException: The field count of logical >>> schema of the table does not match with the field count of physical schema >> >> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` >> STRING>] >> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` >> STRING>,STRING]. >> >> Thanks in advance, >> >> Dongwon >> >>