swuferhong commented on code in PR #22978: URL: https://github.com/apache/flink/pull/22978#discussion_r1264829057
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java: ########## @@ -80,45 +81,60 @@ public WrapJsonAggFunctionArgumentsRule(Config config) { @Override public void onMatch(RelOptRuleCall call) { final LogicalAggregate aggregate = call.rel(0); - final AggregateCall aggCall = aggregate.getAggCallList().get(0); - final RelNode aggInput = aggregate.getInput(); final RelBuilder relBuilder = call.builder().push(aggInput); - final List<Integer> affectedArgs = getAffectedArgs(aggCall); - addProjections(aggregate.getCluster(), relBuilder, affectedArgs); - - final TargetMapping argsMapping = - getAggArgsMapping(aggInput.getRowType().getFieldCount(), affectedArgs); - - final AggregateCall newAggregateCall = aggCall.transform(argsMapping); - final LogicalAggregate newAggregate = - aggregate.copy( - aggregate.getTraitSet(), - relBuilder.build(), - aggregate.getGroupSet(), - aggregate.getGroupSets(), - Collections.singletonList(newAggregateCall)); - call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT))); + final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate, relBuilder); + call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT))); } - /** - * Returns the aggregation's arguments which need to be wrapped. - * - * <p>This list is a subset of {@link AggregateCall#getArgList()} as not every argument may need - * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING} call. - * - * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are removed as we only need to - * wrap them once. - */ - private List<Integer> getAffectedArgs(AggregateCall aggCall) { - if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) { - // For JSON_OBJECTAGG we only need to wrap its second (= value) argument - final int valueIndex = aggCall.getArgList().get(1); - return Collections.singletonList(valueIndex); + private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate, RelBuilder relBuilder) { + int inputCount = aggregate.getInput().getRowType().getFieldCount(); + List<AggregateCall> aggCallList = new ArrayList<>(aggregate.getAggCallList()); + // This map is a mapping relationship between jsonObjectAggCall and the argument index + // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING. This map will be used + // to create newWrappedArgCallList after creating a new Project. + Map<Integer, Integer> wrapIndicesMap = new HashMap<>(); + for (int i = 0; i < aggCallList.size(); i++) { + AggregateCall currentCall = aggCallList.get(i); + if (currentCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) { + // For JSON_OBJECTAGG we only need to wrap its second (= value) argument + final int valueIndex = currentCall.getArgList().get(1); + wrapIndicesMap.put(i, valueIndex); + } else if (currentCall.getAggregation() instanceof SqlJsonArrayAggAggFunction) { + final int valueIndex = currentCall.getArgList().get(0); + wrapIndicesMap.put(i, valueIndex); + } + } + + if (wrapIndicesMap.isEmpty()) { Review Comment: > IIUC, the `wrapIndicesMap` should never be empty here since the rule only matches aggregate which contains json agg(s), so we should remove this check Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org