swuferhong commented on code in PR #22978:
URL: https://github.com/apache/flink/pull/22978#discussion_r1264829057


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java:
##########
@@ -80,45 +81,60 @@ public WrapJsonAggFunctionArgumentsRule(Config config) {
     @Override
     public void onMatch(RelOptRuleCall call) {
         final LogicalAggregate aggregate = call.rel(0);
-        final AggregateCall aggCall = aggregate.getAggCallList().get(0);
-
         final RelNode aggInput = aggregate.getInput();
         final RelBuilder relBuilder = call.builder().push(aggInput);
 
-        final List<Integer> affectedArgs = getAffectedArgs(aggCall);
-        addProjections(aggregate.getCluster(), relBuilder, affectedArgs);
-
-        final TargetMapping argsMapping =
-                getAggArgsMapping(aggInput.getRowType().getFieldCount(), 
affectedArgs);
-
-        final AggregateCall newAggregateCall = aggCall.transform(argsMapping);
-        final LogicalAggregate newAggregate =
-                aggregate.copy(
-                        aggregate.getTraitSet(),
-                        relBuilder.build(),
-                        aggregate.getGroupSet(),
-                        aggregate.getGroupSets(),
-                        Collections.singletonList(newAggregateCall));
-        
call.transformTo(newAggregate.withHints(Collections.singletonList(MARKER_HINT)));
+        final LogicalAggregate wrappedAggregate = wrapJsonAggregate(aggregate, 
relBuilder);
+        
call.transformTo(wrappedAggregate.withHints(Collections.singletonList(MARKER_HINT)));
     }
 
-    /**
-     * Returns the aggregation's arguments which need to be wrapped.
-     *
-     * <p>This list is a subset of {@link AggregateCall#getArgList()} as not 
every argument may need
-     * to be wrapped into a {@link BuiltInFunctionDefinitions#JSON_STRING} 
call.
-     *
-     * <p>Duplicates (e.g. for {@code JSON_OBJECTAGG(f0 VALUE f0)}) are 
removed as we only need to
-     * wrap them once.
-     */
-    private List<Integer> getAffectedArgs(AggregateCall aggCall) {
-        if (aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction) {
-            // For JSON_OBJECTAGG we only need to wrap its second (= value) 
argument
-            final int valueIndex = aggCall.getArgList().get(1);
-            return Collections.singletonList(valueIndex);
+    private LogicalAggregate wrapJsonAggregate(LogicalAggregate aggregate, 
RelBuilder relBuilder) {
+        int inputCount = aggregate.getInput().getRowType().getFieldCount();
+        List<AggregateCall> aggCallList = new 
ArrayList<>(aggregate.getAggCallList());
+        // This map is a mapping relationship between jsonObjectAggCall and 
the argument index
+        // need to be wrapped into a BuiltInFunctionDefinitions#JSON_STRING. 
This map will be used
+        // to create newWrappedArgCallList after creating a new Project.
+        Map<Integer, Integer> wrapIndicesMap = new HashMap<>();
+        for (int i = 0; i < aggCallList.size(); i++) {
+            AggregateCall currentCall = aggCallList.get(i);
+            if (currentCall.getAggregation() instanceof 
SqlJsonObjectAggAggFunction) {
+                // For JSON_OBJECTAGG we only need to wrap its second (= 
value) argument
+                final int valueIndex = currentCall.getArgList().get(1);
+                wrapIndicesMap.put(i, valueIndex);
+            } else if (currentCall.getAggregation() instanceof 
SqlJsonArrayAggAggFunction) {
+                final int valueIndex = currentCall.getArgList().get(0);
+                wrapIndicesMap.put(i, valueIndex);
+            }
+        }
+
+        if (wrapIndicesMap.isEmpty()) {

Review Comment:
   > IIUC, the `wrapIndicesMap` should never be empty here since the rule only 
matches aggregate which contains json agg(s), so we should remove this check
   
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to