[ https://issues.apache.org/jira/browse/FLINK-6070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Hutchison updated FLINK-6070: ---------------------------------- Description: Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a tuple sorting class, as follows. (Only the methods for Tuple2 are defined at the bottom, similar methods could be added for other tuple types.) I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR. {code} import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; /** A class for sorting collections of tuples. */ public class TupleSorter { /** Produce a Tuple comparator for the given number of fields, with the requested field priority and sort order. */ private static Comparator<Tuple> newComparator(final int tupleLen, final int[] fieldPriority, final int[] sortDescendingIndices) { if (fieldPriority == null || fieldPriority.length != tupleLen) { throw new IllegalArgumentException("Invalid sort order"); } boolean[] idxUsed = new boolean[tupleLen]; for (int i = 0; i < fieldPriority.length; i++) { int idx = fieldPriority[i]; if (idx < 0 || idx >= tupleLen) { throw new IllegalArgumentException("fieldPriority entry out of range: " + idx); } if (idxUsed[idx]) { throw new IllegalArgumentException("fieldPriority entry duplicated: " + idx); } idxUsed[idx] = true; } boolean[] sortDescending = new boolean[tupleLen]; for (int i = 0; i < sortDescendingIndices.length; i++) { int idx = sortDescendingIndices[i]; if (idx < 0 || idx >= tupleLen) { throw new IllegalArgumentException("sortDescendingIndices entry out of range: " + idx); } sortDescending[idx] = true; } return (tuple0, tuple1) -> { for (int i = 0; i < tupleLen; i++) { int idx = fieldPriority[i]; @SuppressWarnings({ "rawtypes", "unchecked" }) int diff = ((Comparable) tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx)); if (sortDescending[i]) { diff = -diff; } if (diff != 0) { return diff; } } return 0; }; } /** * Sort a list of tuples. * * @param list * The list of tuples. * @param fieldPriority * The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0 * and 1. The default sort order within a field is ascending. * @param sortDescendingIndices * If provided, inverts the sort order for a given field index from ascending to descending order. */ public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void sort(final List<Tuple2<T0, T1>> list, final int[] fieldPriority, final int... sortDescendingIndices) { list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices)); } /** * Produce a sorted copy of a collection of tuples. * * @param list * The list of tuples. * @param fieldPriority * The sort priority for the fields (primary an secondary sort key): a permutation of the field indices 0 * and 1. The default sort order within a field is ascending. * @param sortDescendingIndices * If provided, inverts the sort order for a given field index from ascending to descending order. */ public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> ArrayList<Tuple2<T0, T1>> sortCopy( final Collection<Tuple2<T0, T1>> collection, final int[] fieldPriority, final int... sortDescendingIndices) { ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection); Collections.sort(list, newComparator(/* tupleLen = */ 2, fieldPriority, sortDescendingIndices)); return list; } } {code} was: Since Java doesn't have built-in tuple types, I find myself using Flink tuples for a lot of tasks in Flink programs. One downside is that these tuples are not inherently comparable, so when you want to sort a collection of tuples, you have to provide a custom comparator. I created a ComparableTuple2 type, as follows. I wanted to get feedback on whether something like this would be considered useful for Flink before I submitted a PR. Also, I don't know how high I should go with the field arity for a ComparableTuple -- presumably not as high as for non-comparable tuples? {code} import org.apache.flink.api.java.tuple.Tuple2; /** A comparable tuple, consisting of comparable fields that act as primary and secondary sort keys. */ public class ComparableTuple2<T0 extends Comparable<T0>, T1 extends Comparable<T1>> extends Tuple2<T0, T1> implements Comparable<Tuple2<T0, T1>> { private static final long serialVersionUID = 1L; private boolean invertSortOrder0; private boolean invertSortOrder1; public ComparableTuple2() { } /** * Create a 2-tuple of comparable elements. * * @param f0 * The first element, which is also the primary sort key, and sorts in ascending order. * @param f1 * The second element, which is also the secondary sort key, and sorts in ascending order. * @param invertSortOrder0 * If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 * If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(T0 f0, T1 f1) { super(f0, f1); } /** * Create a comparable 2-tuple out of comparable elements. * * @param f0 * The first element, which is also the primary sort key, and sorts in ascending order if * invertSortOrder0 == false, else sorts in descending order. * @param f1 * The second element, which is also the secondary sort key, and sorts in decending order if * invertSortOrder1 == false, else sorts in descending order. * @param invertSortOrder0 * If true, invert the sort order for the first field (i.e. sort in descending order). * @param invertSortOrder1 * If true, invert the sort order for the second field (i.e. sort in descending order). */ public ComparableTuple2(final T0 f0, final T1 f1, final boolean invertSortOrder0, final boolean invertSortOrder1) { super(f0, f1); this.invertSortOrder0 = invertSortOrder0; this.invertSortOrder1 = invertSortOrder1; } /** * Comparison function that compares first the primary sort key, f0, and then if equal, compares the secondary sort * key, f1. */ @Override public int compareTo(final Tuple2<T0, T1> o) { int diff = this.f0.compareTo(o.f0); if (invertSortOrder0) { diff = -diff; } if (diff != 0) { return diff; } diff = this.f1.compareTo(o.f1); if (invertSortOrder1) { diff = -diff; } return diff; } } {code} > Suggestion: add ComparableTuple types > ------------------------------------- > > Key: FLINK-6070 > URL: https://issues.apache.org/jira/browse/FLINK-6070 > Project: Flink > Issue Type: Improvement > Components: Core > Affects Versions: 1.2.0 > Reporter: Luke Hutchison > Priority: Minor > > Since Java doesn't have built-in tuple types, I find myself using Flink > tuples for a lot of tasks in Flink programs. One downside is that these > tuples are not inherently comparable, so when you want to sort a collection > of tuples, you have to provide a custom comparator. > I created a tuple sorting class, as follows. (Only the methods for Tuple2 are > defined at the bottom, similar methods could be added for other tuple types.) > I wanted to get feedback on whether something like this would be considered > useful for Flink before I submitted a PR. > {code} > import java.util.ArrayList; > import java.util.Collection; > import java.util.Collections; > import java.util.Comparator; > import java.util.List; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > /** A class for sorting collections of tuples. */ > public class TupleSorter { > /** Produce a Tuple comparator for the given number of fields, with the > requested field priority and sort order. */ > private static Comparator<Tuple> newComparator(final int tupleLen, final > int[] fieldPriority, > final int[] sortDescendingIndices) { > if (fieldPriority == null || fieldPriority.length != tupleLen) { > throw new IllegalArgumentException("Invalid sort order"); > } > boolean[] idxUsed = new boolean[tupleLen]; > for (int i = 0; i < fieldPriority.length; i++) { > int idx = fieldPriority[i]; > if (idx < 0 || idx >= tupleLen) { > throw new IllegalArgumentException("fieldPriority entry out > of range: " + idx); > } > if (idxUsed[idx]) { > throw new IllegalArgumentException("fieldPriority entry > duplicated: " + idx); > } > idxUsed[idx] = true; > } > boolean[] sortDescending = new boolean[tupleLen]; > for (int i = 0; i < sortDescendingIndices.length; i++) { > int idx = sortDescendingIndices[i]; > if (idx < 0 || idx >= tupleLen) { > throw new IllegalArgumentException("sortDescendingIndices > entry out of range: " + idx); > } > sortDescending[idx] = true; > } > return (tuple0, tuple1) -> { > for (int i = 0; i < tupleLen; i++) { > int idx = fieldPriority[i]; > @SuppressWarnings({ "rawtypes", "unchecked" }) > int diff = ((Comparable) > tuple0.getField(idx)).compareTo((Comparable) tuple1.getField(idx)); > if (sortDescending[i]) { > diff = -diff; > } > if (diff != 0) { > return diff; > } > } > return 0; > }; > } > /** > * Sort a list of tuples. > * > * @param list > * The list of tuples. > * @param fieldPriority > * The sort priority for the fields (primary an secondary sort > key): a permutation of the field indices 0 > * and 1. The default sort order within a field is ascending. > * @param sortDescendingIndices > * If provided, inverts the sort order for a given field index > from ascending to descending order. > */ > public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> void > sort(final List<Tuple2<T0, T1>> list, > final int[] fieldPriority, final int... sortDescendingIndices) { > list.sort(newComparator(/* tupleLen = */ 2, fieldPriority, > sortDescendingIndices)); > } > /** > * Produce a sorted copy of a collection of tuples. > * > * @param list > * The list of tuples. > * @param fieldPriority > * The sort priority for the fields (primary an secondary sort > key): a permutation of the field indices 0 > * and 1. The default sort order within a field is ascending. > * @param sortDescendingIndices > * If provided, inverts the sort order for a given field index > from ascending to descending order. > */ > public static <T0 extends Comparable<T0>, T1 extends Comparable<T1>> > ArrayList<Tuple2<T0, T1>> sortCopy( > final Collection<Tuple2<T0, T1>> collection, final int[] > fieldPriority, > final int... sortDescendingIndices) { > ArrayList<Tuple2<T0, T1>> list = new ArrayList<>(collection); > Collections.sort(list, newComparator(/* tupleLen = */ 2, > fieldPriority, sortDescendingIndices)); > return list; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)