godfreyhe commented on a change in pull request #14502: URL: https://github.com/apache/flink/pull/14502#discussion_r551911087
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ Review comment: typo: be sorted ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } + + public int[] getFieldIndices() { + return Arrays.stream(fieldSpecs).mapToInt(SortFieldSpec::getFieldIndex).toArray(); + } + + public boolean[] getFieldOrders() { + boolean[] orders = new boolean[fieldSpecs.length]; + IntStream.range(0, fieldSpecs.length) + .forEach(i -> orders[i] = fieldSpecs[i].isAscendingOrder); + return orders; + } + + public boolean[] getFieldNullIsLasts() { + boolean[] nullIsLasts = new boolean[fieldSpecs.length]; + IntStream.range(0, fieldSpecs.length) + .forEach(i -> nullIsLasts[i] = fieldSpecs[i].nullIsLast); + return nullIsLasts; + } + + public SortFieldSpec getFieldSpec(int index) { + return fieldSpecs[index]; + } + + public int getFieldSize() { + return fieldSpecs.length; + } + + public LogicalType[] getFieldTypes(RowType input) { + return Stream.of(fieldSpecs) + .map(field -> input.getTypeAt(field.fieldIndex)) + .toArray(LogicalType[]::new); + } + + public static SortSpecBuilder builder() { + return new SortSpecBuilder(); + } + + /** SortSpec builder. */ + public static class SortSpecBuilder { + + private List<SortFieldSpec> fieldSpecs = new LinkedList<>(); Review comment: nit: mark it as final ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } + + public int[] getFieldIndices() { + return Arrays.stream(fieldSpecs).mapToInt(SortFieldSpec::getFieldIndex).toArray(); + } + + public boolean[] getFieldOrders() { Review comment: rename to `getAscendingOrders` ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } + + public int[] getFieldIndices() { + return Arrays.stream(fieldSpecs).mapToInt(SortFieldSpec::getFieldIndex).toArray(); + } + + public boolean[] getFieldOrders() { + boolean[] orders = new boolean[fieldSpecs.length]; + IntStream.range(0, fieldSpecs.length) + .forEach(i -> orders[i] = fieldSpecs[i].isAscendingOrder); + return orders; + } + + public boolean[] getFieldNullIsLasts() { + boolean[] nullIsLasts = new boolean[fieldSpecs.length]; + IntStream.range(0, fieldSpecs.length) + .forEach(i -> nullIsLasts[i] = fieldSpecs[i].nullIsLast); + return nullIsLasts; + } + + public SortFieldSpec getFieldSpec(int index) { + return fieldSpecs[index]; + } + + public int getFieldSize() { + return fieldSpecs.length; + } + + public LogicalType[] getFieldTypes(RowType input) { + return Stream.of(fieldSpecs) + .map(field -> input.getTypeAt(field.fieldIndex)) + .toArray(LogicalType[]::new); + } + + public static SortSpecBuilder builder() { + return new SortSpecBuilder(); + } + + /** SortSpec builder. */ + public static class SortSpecBuilder { + + private List<SortFieldSpec> fieldSpecs = new LinkedList<>(); + + public SortSpecBuilder addField( + int fieldIndex, boolean isAscendingOrder, boolean nullIsLast) { + fieldSpecs.add(new SortFieldSpec(fieldIndex, isAscendingOrder, nullIsLast)); + return this; + } + + public SortSpec build() { + return new SortSpec(fieldSpecs.stream().toArray(SortFieldSpec[]::new)); + } + } + + /** Sort info for a Field. */ + public static class SortFieldSpec { + /** index of field. */ Review comment: refer to [RelFieldCollation](https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/RelFieldCollation.java#L206): 0-based index of field being sorted. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } Review comment: This method looks a little specific, I would like to add a method named `get SortFieldSpecs`, and pick the target in specific class. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } + + public int[] getFieldIndices() { Review comment: it's better we can add some comments for each public method. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/PartitionSpec.java ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +/** PartitionSpec describes how data is partitioned in Sort or Rank. */ Review comment: I would like remove `Sort` from the comment, `Sort` does not have `partition` concept, just because the sort operator depends on rank operator implementation. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/SortSpec.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.utils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** SortSpec describes how the data will be sort. */ +public class SortSpec { + + /** SortSpec does not require sort. */ + public static final SortSpec ANY = new SortSpec(new SortFieldSpec[0]); + + private final SortFieldSpec[] fieldSpecs; + + public SortSpec(SortFieldSpec[] fieldSpecs) { + this.fieldSpecs = fieldSpecs; + } + + public SortSpec getSubSpec(int startIndex) { + return new SortSpec( + Arrays.stream(fieldSpecs, startIndex, fieldSpecs.length) + .toArray(SortFieldSpec[]::new)); + } + + public int[] getFieldIndices() { + return Arrays.stream(fieldSpecs).mapToInt(SortFieldSpec::getFieldIndex).toArray(); + } + + public boolean[] getFieldOrders() { + boolean[] orders = new boolean[fieldSpecs.length]; + IntStream.range(0, fieldSpecs.length) + .forEach(i -> orders[i] = fieldSpecs[i].isAscendingOrder); + return orders; + } + + public boolean[] getFieldNullIsLasts() { Review comment: rename to `getNullsIsLast` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org