zhangyue created FLINK-30201: -------------------------------- Summary: Function "unnest" can't process nesting JSON properly Key: FLINK-30201 URL: https://issues.apache.org/jira/browse/FLINK-30201 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.16.0 Reporter: zhangyue
Here is the CREATE TABLE DDL: {code:java} riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\ `data` ROW<\ `flow_id` STRING, \ `flow_name` STRING, \ `flow_version` STRING, \ `risk_id` BIGINT, \ `uid` BIGINT, \ `is_pass` INT, \ `result` INT, \ `country_id` INT, \ `business` STRING, \ `engine_scene_id` STRING, \ `flow_type` STRING, \ `source` STRING, \ `rule_results` ARRAY<ROW<`rule_id` STRING, \ `rule_name` STRING, \ `rule_type` STRING, \ `rule_type_name` STRING, \ `node_id` STRING, \ `result` INT, \ `policy_name` STRING, \ `in_path` BOOLEAN>>\ >,\ proctime as proctime()\ ) WITH (\ 'connector' = 'kafka',\ 'topic' = 'riskRuleEngineResultLevel2_3',\ 'scan.startup.mode' = '%s',\ 'properties.bootstrap.servers' = '%s',\ 'properties.group.id' = '%s',\ 'format' = 'json'\ ) {code} flink sql: {code:java} String executeSql = "select data.flow_id as flow_id,t.rule_id,t.rule_name,t.rule_type,t.rule_type_name,t.node_id,t.`result` from riskRuleEngineResultLevel2_3, unnest(data.rule_results) as t (rule_id,rule_name,rule_type,rule_type_name,node_id,`result`,policy_name,in_path)"; {code} when the param in "unnest" Function is "data.rule_results" which is actually the right structure, the ERROR occurs as below. And when I use "rule_results" instead of "data.rule_results" in "unnest" Function ,It goes well. I think it is wired. {code:java} // Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723) at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 28 more {code} if I change DDL by adding "data2" which is the same structure with "data" at the same level, no matter I use "rule_results" or "data.rule_results" in "unnest" FUNCTION the ERROR occurs: {code:java} riskRuleEngineResultLevel2_3 = CREATE TABLE `riskRuleEngineResultLevel2_3`(\ `data` ROW<\ `flow_id` STRING, \ `flow_name` STRING, \ `flow_version` STRING, \ `risk_id` BIGINT, \ `uid` BIGINT, \ `is_pass` INT, \ `result` INT, \ `country_id` INT, \ `business` STRING, \ `engine_scene_id` STRING, \ `flow_type` STRING, \ `source` STRING, \ `rule_results` ARRAY<ROW<`rule_id` STRING, \ `rule_name` STRING, \ `rule_type` STRING, \ `rule_type_name` STRING, \ `node_id` STRING, \ `result` INT, \ `policy_name` STRING, \ `in_path` BOOLEAN>>\ >,\ `data2` ROW<\ `flow_id` STRING, \ `flow_name` STRING, \ `flow_version` STRING, \ `risk_id` BIGINT, \ `uid` BIGINT, \ `is_pass` INT, \ `result` INT, \ `country_id` INT, \ `business` STRING, \ `engine_scene_id` STRING, \ `flow_type` STRING, \ `source` STRING, \ `rule_results` ARRAY<ROW<`rule_id` STRING, \ `rule_name` STRING, \ `rule_type` STRING, \ `rule_type_name` STRING, \ `node_id` STRING, \ `result` INT, \ `policy_name` STRING, \ `in_path` BOOLEAN>>\ >,\ proctime as proctime()\ ) WITH (\ 'connector' = 'kafka',\ 'topic' = 'riskRuleEngineResultLevel2_3',\ 'scan.startup.mode' = '%s',\ 'properties.bootstrap.servers' = '%s',\ 'properties.group.id' = '%s',\ 'format' = 'json'\ ) {code} ERROR when "rule_results" in "unnest": {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 146 to line 1, column 157: Column 'rule_results' is ambiguous at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723) at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 146 to line 1, column 157: Column 'rule_results' is ambiguous at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:467) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2921) at org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:300) at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5404) at org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:64) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122) at org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:69) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:43) at org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:190) at org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:155) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'rule_results' is ambiguous at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 46 more{code} ERROR when "data.rule_results" in "unnest" {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723) at com.akulaku.flink_tasks_project.tasks.FlowsRuleResultRiskCalc.main(FlowsRuleResultRiskCalc.java:42) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 0, column 0 to line 1, column 149: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4867) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5839) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5823) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5431) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3101) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3082) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182) ... 5 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'data.data' not found in table 'riskRuleEngineResultLevel2_3' at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) ... 28 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)