WencongLiu commented on code in PR #24398:
URL: https://github.com/apache/flink/pull/24398#discussion_r1510733360


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/AbstractSortPartitionOperator.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * The {@link AbstractSortPartitionOperator} is the base class of sort 
partition operator, which
+ * provides shared construction methods and utility functions.
+ *
+ * @param <INPUT_TYPE> The type of input record.
+ * @param <SORT_TYPE> The type used to sort the records, which may be 
different from the INPUT_TYPE.
+ *     For example, if the input record is sorted according to the selected 
key by {@link
+ *     KeySelector}, the selected key should also be written to {@link 
ExternalSorter} with the
+ *     input record to avid repeated key selections. In this case, the type 
used to sort the records
+ *     will be a tuple containing both the selected key and record.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public abstract class AbstractSortPartitionOperator<INPUT_TYPE, SORT_TYPE>
+        extends AbstractStreamOperator<INPUT_TYPE>
+        implements OneInputStreamOperator<INPUT_TYPE, INPUT_TYPE>, 
BoundedOneInput {
+
+    /** The default manage memory weight of sort partition operator. */
+    public static final int DEFAULT_MANAGE_MEMORY_WEIGHT = 128;
+
+    /** The type information of input records. */
+    protected final TypeInformation<INPUT_TYPE> inputType;
+
+    /** The type information used to sort the records. */
+    protected final TypeInformation<SORT_TYPE> sortType;
+
+    /** The selector to create the sort key for records. */
+    protected final KeySelector<INPUT_TYPE, ?> sortFieldSelector;
+
+    /** The order to sort records. */
+    private final Order sortOrder;
+
+    /** The string field to indicate the sort key for records with tuple or 
pojo type. */
+    private final String stringSortField;
+
+    /** The int field to indicate the sort key for records with tuple type. */
+    private final int positionSortField;
+
+    /** The sorter to sort records. */
+    protected PushSorter<SORT_TYPE> sorter = null;
+
+    public AbstractSortPartitionOperator(
+            TypeInformation<INPUT_TYPE> inputType, int positionSortField, 
Order sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(positionSortField);
+        this.sortType = (TypeInformation<SORT_TYPE>) inputType;
+        this.positionSortField = positionSortField;
+        this.stringSortField = null;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public AbstractSortPartitionOperator(
+            TypeInformation<INPUT_TYPE> inputType, String stringSortField, 
Order sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(stringSortField);
+        this.sortType = (TypeInformation<SORT_TYPE>) inputType;
+        this.positionSortField = -1;
+        this.stringSortField = stringSortField;
+        this.sortFieldSelector = null;
+        this.sortOrder = sortOrder;
+    }
+
+    public <K> AbstractSortPartitionOperator(
+            TypeInformation<INPUT_TYPE> inputType,
+            KeySelector<INPUT_TYPE, K> sortFieldSelector,
+            Order sortOrder) {
+        this.inputType = inputType;
+        ensureFieldSortable(sortFieldSelector);
+        this.sortType =
+                (TypeInformation<SORT_TYPE>)
+                        Types.TUPLE(
+                                
TypeExtractor.getKeySelectorTypes(sortFieldSelector, inputType),
+                                inputType);
+        this.positionSortField = -1;
+        this.stringSortField = null;
+        this.sortFieldSelector = sortFieldSelector;
+        this.sortOrder = sortOrder;
+    }
+
+    @Override
+    public OperatorAttributes getOperatorAttributes() {
+        return new 
OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build();
+    }
+
+    /**
+     * Get tht sorter.
+     *
+     * @param typeSerializer the serializer of the sort type.
+     * @param typeComparator the comparator of the sort type.
+     * @param streamTask the current stream task.
+     * @return the sorter.
+     * @param <TYPE> the sort type.
+     */
+    protected <TYPE> PushSorter<TYPE> getSorter(
+            TypeSerializer<TYPE> typeSerializer,
+            TypeComparator<TYPE> typeComparator,
+            StreamTask<?, ?> streamTask) {
+        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
+        Configuration jobConfiguration = 
streamTask.getEnvironment().getJobConfiguration();
+        double managedMemoryFraction =
+                config.getManagedMemoryFractionOperatorUseCaseOfSlot(
+                        ManagedMemoryUseCase.OPERATOR,
+                        streamTask.getEnvironment().getJobConfiguration(),
+                        streamTask.getEnvironment().getTaskConfiguration(),
+                        userCodeClassLoader);
+        try {
+            return ExternalSorter.newBuilder(
+                            streamTask.getEnvironment().getMemoryManager(),
+                            streamTask,
+                            typeSerializer,
+                            typeComparator,
+                            streamTask.getExecutionConfig())
+                    .memoryFraction(managedMemoryFraction)
+                    .enableSpilling(
+                            streamTask.getEnvironment().getIOManager(),
+                            
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                    
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+                    
.objectReuse(streamTask.getExecutionConfig().isObjectReuseEnabled())
+                    
.largeRecords(jobConfiguration.get(AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
+                    .build();
+        } catch (MemoryAllocationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Get the comparator for sort type.
+     *
+     * @return the comparator for sort type.
+     */
+    protected TypeComparator<SORT_TYPE> getSortTypeComparator() {
+        // 1.Initialize the logical sort field index.
+        int[] logicalSortFieldIndex = new int[1];
+        if (positionSortField != -1) {
+            logicalSortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(positionSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        } else if (stringSortField != null) {
+            logicalSortFieldIndex[0] =
+                    new Keys.ExpressionKeys<>(stringSortField, inputType)
+                            .computeLogicalKeyPositions()[0];
+        }
+
+        // 2.Initialize the sort order.
+        boolean[] sortOrderIndicator = new boolean[1];
+        sortOrderIndicator[0] = this.sortOrder == Order.ASCENDING;
+
+        // 3.Create the type comparator.
+        if (sortType instanceof CompositeType) {
+            return ((CompositeType<SORT_TYPE>) sortType)
+                    .createComparator(
+                            logicalSortFieldIndex, sortOrderIndicator, 0, 
getExecutionConfig());
+        } else if (sortType instanceof AtomicType) {
+            return ((AtomicType<SORT_TYPE>) sortType)
+                    .createComparator(this.sortOrder == Order.ASCENDING, 
getExecutionConfig());
+        } else {
+            throw new UnsupportedOperationException(sortType + " doesn't 
support sorting.");

Review Comment:
   I double-checked the three SortPartition APIs in DataSet and I found that 
the TypeInformation of the sorted data in the APIs are all CompositeType.
   1. sortPartition(int field, Order order): the type of the sorted data must 
be Tuple, which is CompositeType.
   2. sortPartition(String field, Order order): the type of the sorted data 
must be Flink Pojo, which is CompositeType.
   3. sortPartition(KeySelector<T, K> keySelector, Order order): the type of 
sorted data is a Tuple contains selected key and record, which is CompositeType.
   
   The detailed code change is in the refactored implementations, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to