[ https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312872#comment-17312872 ]
Jark Wu commented on FLINK-22082: --------------------------------- {{FileSystemTableSource}} doesn't support nested projection pushdown yet. > Nested projection push down doesn't work for data such as row(array(row)) > ------------------------------------------------------------------------- > > Key: FLINK-22082 > URL: https://issues.apache.org/jira/browse/FLINK-22082 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.0, 1.13.0 > Reporter: Dian Fu > Priority: Major > > For the following job: > {code} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment > config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, config) > source_ddl = """ > CREATE TABLE InTable ( > `ID` STRING, > `Timestamp` TIMESTAMP(3), > `Result` ROW( > `data` ROW(`value` BIGINT) ARRAY), > WATERMARK FOR `Timestamp` AS `Timestamp` > ) WITH ( > 'connector' = 'filesystem', > 'format' = 'json', > 'path' = '/tmp/1.txt' > ) > """ > sink_ddl = """ > CREATE TABLE OutTable ( > `ID` STRING, > `value` BIGINT > ) WITH ( > 'connector' = 'print' > ) > """ > t_env.execute_sql(source_ddl) > t_env.execute_sql(sink_ddl) > table = t_env.from_path('InTable') > table \ > .select( > table.ID, > table.Result.data.at(1).value) \ > .execute_insert('OutTable') \ > .wait() > {code} > It will thrown the following exception: > {code} > : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > {code} > See > https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array > for more details -- This message was sent by Atlassian Jira (v8.3.4#803005)