aljoscha commented on a change in pull request #13529:
URL: https://github.com/apache/flink/pull/13529#discussion_r501619194



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted 
independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability 
only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {
+       private final StreamTaskInput<?>[] inputs;
+       private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] 
sorters;
+       private final PriorityQueue<HeadElement> queueOfHeads = new 
PriorityQueue<>();
+       private final KeySelector<Object, K>[] keySelectors;
+       private final TypeSerializer<K> keySerializer;
+       private final DataOutputSerializer dataOutputSerializer;
+       private final SortingPhaseDataOutput sortingPhaseDataOutput = new 
SortingPhaseDataOutput();
+       private final long[] seenWatermarks;
+       private final AvailabilityProvider.AvailabilityHelper allFinished = new 
AvailabilityProvider.AvailabilityHelper();
+
+       private long notFinishedSortingMask = 0;
+       private long finishedEmitting = 0;
+
+       @SuppressWarnings("unchecked")
+       public MultiInputSortingDataInputs(
+                       StreamTaskInput<Object>[] inputs,
+                       KeySelector<Object, K>[] keySelectors,
+                       TypeSerializer<Object>[] inputSerializers,
+                       TypeSerializer<K> keySerializer,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       boolean objectReuse,
+                       double managedMemoryFraction,
+                       Configuration jobConfiguration,
+                       AbstractInvokable containingTask) {
+               this.inputs = inputs;
+               this.keySelectors = keySelectors;
+               this.keySerializer = checkNotNull(keySerializer);
+               int keyLength = keySerializer.getLength();
+               final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> 
comparator;
+               if (keyLength > 0) {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(keyLength);
+                       comparator = new 
FixedLengthByteKeyComparator<>(keyLength);
+               } else {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(64);
+                       comparator = new VariableLengthByteKeyComparator<>();
+               }
+               int numberOfInputs = inputs.length;
+               sorters = new PushSorter[numberOfInputs];
+               seenWatermarks = new long[numberOfInputs];
+               Arrays.fill(seenWatermarks, Long.MIN_VALUE);
+               for (int i = 0; i < numberOfInputs; i++) {
+                       notFinishedSortingMask = 
setBitMask(notFinishedSortingMask, i);
+                       KeyAndValueSerializer<Object> keyAndValueSerializer = 
new KeyAndValueSerializer<>(
+                               inputSerializers[i],
+                               keyLength);
+                       try {
+                               sorters[i] = ExternalSorter.newBuilder(
+                                       memoryManager,
+                                       containingTask,
+                                       keyAndValueSerializer,
+                                       comparator)
+                                       .memoryFraction(managedMemoryFraction / 
numberOfInputs)
+                                       .enableSpilling(
+                                               ioManager,
+                                               
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                                       
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 
numberOfInputs)
+                                       .objectReuse(objectReuse)
+                                       .largeRecords(true)
+                                       .build();
+                       } catch (MemoryAllocationException e) {
+                               throw new RuntimeException();
+                       }
+               }
+       }
+
+       public <IN> StreamTaskInput<IN> getInput(int idx) {
+               return new IndexedInput<>(idx);
+       }
+
+       /**
+        * A thin wrapper that represents a head of a sorted input. 
Additionally it keeps the id of
+        * the input it belongs to.
+        *
+        * <p>The class is mutable and we only ever have a single instance per 
input.
+        */
+       private static final class HeadElement implements 
Comparable<HeadElement> {
+               final int inputIndex;
+               Tuple2<byte[], StreamRecord<Object>> streamElement;
+
+               private HeadElement(int inputIndex) {
+                       this.inputIndex = inputIndex;
+               }
+
+               @Override
+               public int compareTo(HeadElement o) {
+                       int keyCmp = compare(streamElement.f0, 
o.streamElement.f0);
+                       if (keyCmp != 0) {
+                               return keyCmp;
+                       }
+                       return Long.compare(
+                               streamElement.f1.asRecord().getTimestamp(),
+                               o.streamElement.f1.asRecord().getTimestamp());
+               }
+
+               private int compare(byte[] first, byte[] second) {
+                       int firstLength = first.length;
+                       int secondLength = second.length;
+                       int minLength = Math.min(firstLength, secondLength);
+                       for (int i = 0; i < minLength; i++) {
+                               int cmp = Byte.compare(first[i], second[i]);
+
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+
+                       return Integer.compare(firstLength, secondLength);
+               }
+       }
+
+       /**
+        * An input that wraps an underlying input and sorts the incoming 
records. It starts emitting records
+        * downstream only when all the other inputs coupled with this {@link 
MultiInputSortingDataInputs} have
+        * finished sorting as well.
+        *
+        * <p>Moreover it will report it is {@link #isAvailable() available} or
+        * {@link #isApproximatelyAvailable() approximately available} if it 
has some records pending only if the head
+        * of the {@link #queueOfHeads} belongs to the input. That way there is 
only ever one input that reports it is
+        * available.
+        */
+       private class IndexedInput<IN> implements StreamTaskInput<IN> {
+
+               private final int idx;
+               private MutableObjectIterator<Tuple2<byte[], 
StreamRecord<Object>>> sortedInput;
+
+               private IndexedInput(int idx) {
+                       this.idx = idx;
+               }
+
+               @Override
+               public int getInputIndex() {
+                       return idx;
+               }
+
+               @Override
+               public CompletableFuture<Void> prepareSnapshot(
+                               ChannelStateWriter channelStateWriter,
+                               long checkpointId) {
+                       throw new UnsupportedOperationException();

Review comment:
       Again, not sure I caught this on the one-input version but we should 
give some information about why it's not supported.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted 
independently and afterwards

Review comment:
       I forget if we mentioned this in the one-input implementations but we 
should mention that this does sorting by key and timestamp (in that order). 
It's not some generic sorting utility.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted 
independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability 
only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {
+       private final StreamTaskInput<?>[] inputs;
+       private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] 
sorters;
+       private final PriorityQueue<HeadElement> queueOfHeads = new 
PriorityQueue<>();
+       private final KeySelector<Object, K>[] keySelectors;
+       private final TypeSerializer<K> keySerializer;
+       private final DataOutputSerializer dataOutputSerializer;
+       private final SortingPhaseDataOutput sortingPhaseDataOutput = new 
SortingPhaseDataOutput();
+       private final long[] seenWatermarks;
+       private final AvailabilityProvider.AvailabilityHelper allFinished = new 
AvailabilityProvider.AvailabilityHelper();
+
+       private long notFinishedSortingMask = 0;
+       private long finishedEmitting = 0;
+
+       @SuppressWarnings("unchecked")
+       public MultiInputSortingDataInputs(
+                       StreamTaskInput<Object>[] inputs,
+                       KeySelector<Object, K>[] keySelectors,
+                       TypeSerializer<Object>[] inputSerializers,
+                       TypeSerializer<K> keySerializer,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       boolean objectReuse,
+                       double managedMemoryFraction,
+                       Configuration jobConfiguration,
+                       AbstractInvokable containingTask) {
+               this.inputs = inputs;
+               this.keySelectors = keySelectors;
+               this.keySerializer = checkNotNull(keySerializer);
+               int keyLength = keySerializer.getLength();
+               final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> 
comparator;
+               if (keyLength > 0) {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(keyLength);
+                       comparator = new 
FixedLengthByteKeyComparator<>(keyLength);
+               } else {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(64);
+                       comparator = new VariableLengthByteKeyComparator<>();
+               }
+               int numberOfInputs = inputs.length;
+               sorters = new PushSorter[numberOfInputs];
+               seenWatermarks = new long[numberOfInputs];
+               Arrays.fill(seenWatermarks, Long.MIN_VALUE);
+               for (int i = 0; i < numberOfInputs; i++) {
+                       notFinishedSortingMask = 
setBitMask(notFinishedSortingMask, i);
+                       KeyAndValueSerializer<Object> keyAndValueSerializer = 
new KeyAndValueSerializer<>(
+                               inputSerializers[i],
+                               keyLength);
+                       try {
+                               sorters[i] = ExternalSorter.newBuilder(
+                                       memoryManager,
+                                       containingTask,
+                                       keyAndValueSerializer,
+                                       comparator)
+                                       .memoryFraction(managedMemoryFraction / 
numberOfInputs)
+                                       .enableSpilling(
+                                               ioManager,
+                                               
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                                       
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 
numberOfInputs)
+                                       .objectReuse(objectReuse)
+                                       .largeRecords(true)
+                                       .build();
+                       } catch (MemoryAllocationException e) {
+                               throw new RuntimeException();
+                       }
+               }
+       }
+
+       public <IN> StreamTaskInput<IN> getInput(int idx) {
+               return new IndexedInput<>(idx);
+       }
+
+       /**
+        * A thin wrapper that represents a head of a sorted input. 
Additionally it keeps the id of
+        * the input it belongs to.
+        *
+        * <p>The class is mutable and we only ever have a single instance per 
input.
+        */
+       private static final class HeadElement implements 
Comparable<HeadElement> {
+               final int inputIndex;
+               Tuple2<byte[], StreamRecord<Object>> streamElement;
+
+               private HeadElement(int inputIndex) {
+                       this.inputIndex = inputIndex;
+               }
+
+               @Override
+               public int compareTo(HeadElement o) {
+                       int keyCmp = compare(streamElement.f0, 
o.streamElement.f0);
+                       if (keyCmp != 0) {
+                               return keyCmp;
+                       }
+                       return Long.compare(
+                               streamElement.f1.asRecord().getTimestamp(),
+                               o.streamElement.f1.asRecord().getTimestamp());
+               }
+
+               private int compare(byte[] first, byte[] second) {
+                       int firstLength = first.length;
+                       int secondLength = second.length;
+                       int minLength = Math.min(firstLength, secondLength);
+                       for (int i = 0; i < minLength; i++) {
+                               int cmp = Byte.compare(first[i], second[i]);
+
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+
+                       return Integer.compare(firstLength, secondLength);
+               }
+       }
+
+       /**
+        * An input that wraps an underlying input and sorts the incoming 
records. It starts emitting records
+        * downstream only when all the other inputs coupled with this {@link 
MultiInputSortingDataInputs} have
+        * finished sorting as well.
+        *
+        * <p>Moreover it will report it is {@link #isAvailable() available} or
+        * {@link #isApproximatelyAvailable() approximately available} if it 
has some records pending only if the head
+        * of the {@link #queueOfHeads} belongs to the input. That way there is 
only ever one input that reports it is
+        * available.
+        */
+       private class IndexedInput<IN> implements StreamTaskInput<IN> {
+
+               private final int idx;
+               private MutableObjectIterator<Tuple2<byte[], 
StreamRecord<Object>>> sortedInput;
+
+               private IndexedInput(int idx) {
+                       this.idx = idx;
+               }
+
+               @Override
+               public int getInputIndex() {
+                       return idx;
+               }
+
+               @Override
+               public CompletableFuture<Void> prepareSnapshot(
+                               ChannelStateWriter channelStateWriter,
+                               long checkpointId) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       IOException ex = null;
+                       try {
+                               inputs[idx].close();

Review comment:
       Instead of using array accesses for `inputs` and `sorters`, could we 
just get them once in the constructor and store in fields? This is related to 
the question below about using one `SortingPhaseDataOutput` per input.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted 
independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability 
only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {
+       private final StreamTaskInput<?>[] inputs;
+       private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] 
sorters;
+       private final PriorityQueue<HeadElement> queueOfHeads = new 
PriorityQueue<>();
+       private final KeySelector<Object, K>[] keySelectors;
+       private final TypeSerializer<K> keySerializer;
+       private final DataOutputSerializer dataOutputSerializer;
+       private final SortingPhaseDataOutput sortingPhaseDataOutput = new 
SortingPhaseDataOutput();
+       private final long[] seenWatermarks;
+       private final AvailabilityProvider.AvailabilityHelper allFinished = new 
AvailabilityProvider.AvailabilityHelper();
+
+       private long notFinishedSortingMask = 0;
+       private long finishedEmitting = 0;
+
+       @SuppressWarnings("unchecked")
+       public MultiInputSortingDataInputs(
+                       StreamTaskInput<Object>[] inputs,
+                       KeySelector<Object, K>[] keySelectors,
+                       TypeSerializer<Object>[] inputSerializers,
+                       TypeSerializer<K> keySerializer,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       boolean objectReuse,
+                       double managedMemoryFraction,
+                       Configuration jobConfiguration,
+                       AbstractInvokable containingTask) {
+               this.inputs = inputs;
+               this.keySelectors = keySelectors;
+               this.keySerializer = checkNotNull(keySerializer);
+               int keyLength = keySerializer.getLength();
+               final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> 
comparator;
+               if (keyLength > 0) {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(keyLength);
+                       comparator = new 
FixedLengthByteKeyComparator<>(keyLength);
+               } else {
+                       this.dataOutputSerializer = new 
DataOutputSerializer(64);
+                       comparator = new VariableLengthByteKeyComparator<>();
+               }
+               int numberOfInputs = inputs.length;
+               sorters = new PushSorter[numberOfInputs];
+               seenWatermarks = new long[numberOfInputs];
+               Arrays.fill(seenWatermarks, Long.MIN_VALUE);
+               for (int i = 0; i < numberOfInputs; i++) {
+                       notFinishedSortingMask = 
setBitMask(notFinishedSortingMask, i);
+                       KeyAndValueSerializer<Object> keyAndValueSerializer = 
new KeyAndValueSerializer<>(
+                               inputSerializers[i],
+                               keyLength);
+                       try {
+                               sorters[i] = ExternalSorter.newBuilder(
+                                       memoryManager,
+                                       containingTask,
+                                       keyAndValueSerializer,
+                                       comparator)
+                                       .memoryFraction(managedMemoryFraction / 
numberOfInputs)
+                                       .enableSpilling(
+                                               ioManager,
+                                               
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                                       
.maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / 
numberOfInputs)
+                                       .objectReuse(objectReuse)
+                                       .largeRecords(true)
+                                       .build();
+                       } catch (MemoryAllocationException e) {
+                               throw new RuntimeException();
+                       }
+               }
+       }
+
+       public <IN> StreamTaskInput<IN> getInput(int idx) {
+               return new IndexedInput<>(idx);
+       }
+
+       /**
+        * A thin wrapper that represents a head of a sorted input. 
Additionally it keeps the id of
+        * the input it belongs to.
+        *
+        * <p>The class is mutable and we only ever have a single instance per 
input.
+        */
+       private static final class HeadElement implements 
Comparable<HeadElement> {
+               final int inputIndex;
+               Tuple2<byte[], StreamRecord<Object>> streamElement;
+
+               private HeadElement(int inputIndex) {
+                       this.inputIndex = inputIndex;
+               }
+
+               @Override
+               public int compareTo(HeadElement o) {
+                       int keyCmp = compare(streamElement.f0, 
o.streamElement.f0);
+                       if (keyCmp != 0) {
+                               return keyCmp;
+                       }
+                       return Long.compare(
+                               streamElement.f1.asRecord().getTimestamp(),
+                               o.streamElement.f1.asRecord().getTimestamp());
+               }
+
+               private int compare(byte[] first, byte[] second) {
+                       int firstLength = first.length;
+                       int secondLength = second.length;
+                       int minLength = Math.min(firstLength, secondLength);
+                       for (int i = 0; i < minLength; i++) {
+                               int cmp = Byte.compare(first[i], second[i]);
+
+                               if (cmp != 0) {
+                                       return cmp;
+                               }
+                       }
+
+                       return Integer.compare(firstLength, secondLength);
+               }
+       }
+
+       /**
+        * An input that wraps an underlying input and sorts the incoming 
records. It starts emitting records
+        * downstream only when all the other inputs coupled with this {@link 
MultiInputSortingDataInputs} have
+        * finished sorting as well.
+        *
+        * <p>Moreover it will report it is {@link #isAvailable() available} or
+        * {@link #isApproximatelyAvailable() approximately available} if it 
has some records pending only if the head
+        * of the {@link #queueOfHeads} belongs to the input. That way there is 
only ever one input that reports it is
+        * available.
+        */
+       private class IndexedInput<IN> implements StreamTaskInput<IN> {
+
+               private final int idx;
+               private MutableObjectIterator<Tuple2<byte[], 
StreamRecord<Object>>> sortedInput;
+
+               private IndexedInput(int idx) {
+                       this.idx = idx;
+               }
+
+               @Override
+               public int getInputIndex() {
+                       return idx;
+               }
+
+               @Override
+               public CompletableFuture<Void> prepareSnapshot(
+                               ChannelStateWriter channelStateWriter,
+                               long checkpointId) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       IOException ex = null;
+                       try {
+                               inputs[idx].close();
+                       } catch (IOException e) {
+                               ex = ExceptionUtils.firstOrSuppressed(e, ex);
+                       }
+
+                       try {
+                               sorters[idx].close();
+                       } catch (IOException e) {
+                               ex = ExceptionUtils.firstOrSuppressed(e, ex);
+                       }
+
+                       if (ex != null) {
+                               throw ex;
+                       }
+               }
+
+               @Override
+               @SuppressWarnings({"rawtypes", "unchecked"})
+               public InputStatus emitNext(DataOutput<IN> output) throws 
Exception {
+                       if (sortedInput != null) {
+                               return emitNextAfterSorting(output);
+                       }
+
+                       sortingPhaseDataOutput.currentIdx = idx;
+                       InputStatus inputStatus = 
inputs[idx].emitNext((DataOutput) sortingPhaseDataOutput);
+                       if (inputStatus == InputStatus.END_OF_INPUT) {
+                               endSorting();
+                               return addNextToQueue(new HeadElement(idx), 
output);
+                       }
+
+                       return inputStatus;
+               }
+
+               @Nonnull
+               @SuppressWarnings({"unchecked"})
+               private InputStatus emitNextAfterSorting(DataOutput<IN> output) 
throws Exception {
+                       if (checkBitMask(finishedEmitting, idx)) {
+                               return InputStatus.END_OF_INPUT;
+                       } else if (allSorted()) {
+                               HeadElement head = queueOfHeads.peek();
+                               if (head != null && head.inputIndex == idx) {
+                                       HeadElement headElement = 
queueOfHeads.poll();
+                                       output.emitRecord((StreamRecord<IN>) 
headElement.streamElement.f1);
+                                       return addNextToQueue(headElement, 
output);
+                               } else {
+                                       return InputStatus.NOTHING_AVAILABLE;
+                               }
+                       } else {
+                               return InputStatus.NOTHING_AVAILABLE;
+                       }
+               }
+
+               private void endSorting() throws Exception {
+                       sorters[idx].finishReading();
+                       notFinishedSortingMask = 
unsetBitMask(notFinishedSortingMask, idx);
+                       sortedInput = sorters[idx].getIterator();
+                       if (allSorted()) {
+                               
allFinished.getUnavailableToResetAvailable().complete(null);
+                       }
+               }
+
+               @Nonnull
+               private InputStatus addNextToQueue(HeadElement reuse, 
DataOutput<IN> output) throws Exception {
+                       Tuple2<byte[], StreamRecord<Object>> next = 
sortedInput.next();
+                       if (next != null) {
+                               reuse.streamElement = next;
+                               queueOfHeads.add(reuse);
+                       } else {
+                               finishedEmitting = setBitMask(finishedEmitting, 
idx);
+                               if (seenWatermarks[idx] > Long.MIN_VALUE) {
+                                       output.emitWatermark(new 
Watermark(seenWatermarks[idx]));
+                               }
+                               return InputStatus.END_OF_INPUT;
+                       }
+
+                       if (allSorted()) {
+                               HeadElement headElement = queueOfHeads.peek();
+                               if (headElement != null) {
+                                       if (headElement.inputIndex == idx) {
+                                               return 
InputStatus.MORE_AVAILABLE;
+                                       }
+                               }
+                       }
+
+                       return InputStatus.NOTHING_AVAILABLE;
+               }
+
+               @Override
+               public boolean isApproximatelyAvailable() {
+                       if (sortedInput != null) {
+                               return isHeadAvailable();
+                       } else {
+                               return 
StreamTaskInput.super.isApproximatelyAvailable();
+                       }
+               }
+
+               @Override
+               public boolean isAvailable() {
+                       if (sortedInput != null) {
+                               return isHeadAvailable();
+                       } else {
+                               return StreamTaskInput.super.isAvailable();
+                       }
+               }
+
+               private boolean isHeadAvailable() {
+                       if (!allSorted()) {
+                               return false;
+                       }
+                       HeadElement headElement = queueOfHeads.peek();
+                       return headElement != null && headElement.inputIndex == 
idx;
+               }
+
+               @Override
+               public CompletableFuture<?> getAvailableFuture() {
+                       if (sortedInput != null) {
+                               return allFinished.getAvailableFuture();
+                       } else {
+                               return inputs[idx].getAvailableFuture();
+                       }
+               }
+       }
+
+       /**
+        * A simple {@link PushingAsyncDataInput.DataOutput} used in the 
sorting phase when we have not seen all the
+        * records from the underlying input yet. It forwards the records to a 
corresponding sorter.
+        */
+       private class SortingPhaseDataOutput implements 
PushingAsyncDataInput.DataOutput<Object> {
+
+               int currentIdx;

Review comment:
       Would it maybe be better to have one `SortingPhaseDataOutput` per 
indexed input. It could also have "hard links" to the things it needs like key 
selector, sorter, etc.
   
   And then we also wouldn't need this mutable field.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+
+/**
+ * An input that wraps an underlying input and sorts the incoming records. It 
starts emitting records
+ * downstream only when all the other inputs coupled with this {@link 
MultiInputSortingDataInput} have
+ * finished sorting as well.
+ *
+ * <p>Moreover it will report it is {@link #isAvailable() available} or
+ * {@link #isApproximatelyAvailable() approximately available} if it has some 
records pending only if the head
+ * of the {@link CommonContext#getQueueOfHeads()} belongs to the input. That 
way there is only ever one input
+ * that reports it is available.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and 
serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are 
used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if 
the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} 
otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are propagated 
downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all 
records.

Review comment:
       It emits the maximum watermark it has seen, MAX_WATERMARK could be 
interpreted as `Long.MAX_VALUE`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+
+/**
+ * An input that wraps an underlying input and sorts the incoming records. It 
starts emitting records
+ * downstream only when all the other inputs coupled with this {@link 
MultiInputSortingDataInput} have
+ * finished sorting as well.
+ *
+ * <p>Moreover it will report it is {@link #isAvailable() available} or
+ * {@link #isApproximatelyAvailable() approximately available} if it has some 
records pending only if the head
+ * of the {@link CommonContext#getQueueOfHeads()} belongs to the input. That 
way there is only ever one input
+ * that reports it is available.
+ *
+ * <p>The sorter uses binary comparison of keys, which are extracted and 
serialized when received
+ * from the chained input. Moreover the timestamps of incoming records are 
used for secondary ordering.
+ * For the comparison it uses either {@link FixedLengthByteKeyComparator} if 
the length of the
+ * serialized key is constant, or {@link VariableLengthByteKeyComparator} 
otherwise.
+ *
+ * <p>Watermarks, stream statuses, nor latency markers are propagated 
downstream as they do not make
+ * sense with buffered records. The input emits a MAX_WATERMARK after all 
records.
+ */
+public final class MultiInputSortingDataInput<IN, K> implements 
StreamTaskInput<IN> {
+       private final int idx;
+       private final StreamTaskInput<IN> wrappedInput;
+       private final PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter;
+       private final CommonContext commonContext;
+       private final SortingPhaseDataOutput sortingPhaseDataOutput = new 
SortingPhaseDataOutput();
+
+       private final KeySelector<IN, K> keySelector;
+       private final TypeSerializer<K> keySerializer;
+       private final DataOutputSerializer dataOutputSerializer;
+
+       private MutableObjectIterator<Tuple2<byte[], StreamRecord<IN>>> 
sortedInput;
+       private long seenWatermark = Long.MIN_VALUE;
+
+       private MultiInputSortingDataInput(
+                       CommonContext commonContext,
+                       StreamTaskInput<IN> wrappedInput,
+                       int inputIdx,
+                       PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter,
+                       KeySelector<IN, K> keySelector,
+                       TypeSerializer<K> keySerializer,
+                       DataOutputSerializer dataOutputSerializer) {
+               this.wrappedInput = wrappedInput;
+               this.idx = inputIdx;
+               this.commonContext = commonContext;
+               this.sorter = sorter;
+               this.keySelector = keySelector;
+               this.keySerializer = keySerializer;
+               this.dataOutputSerializer = dataOutputSerializer;
+       }
+
+       public static <K> StreamTaskInput<?>[] wrapInputs(
+                       AbstractInvokable containingTask,
+                       StreamTaskInput<Object>[] inputs,
+                       KeySelector<Object, K>[] keySelectors,
+                       TypeSerializer<Object>[] inputSerializers,
+                       TypeSerializer<K> keySerializer,
+                       MemoryManager memoryManager,
+                       IOManager ioManager,
+                       boolean objectReuse,
+                       double managedMemoryFraction,
+                       Configuration jobConfiguration) {
+               int keyLength = keySerializer.getLength();
+               final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> 
comparator;
+               DataOutputSerializer dataOutputSerializer;
+               if (keyLength > 0) {
+                       dataOutputSerializer = new 
DataOutputSerializer(keyLength);
+                       comparator = new 
FixedLengthByteKeyComparator<>(keyLength);
+               } else {
+                       dataOutputSerializer = new DataOutputSerializer(64);
+                       comparator = new VariableLengthByteKeyComparator<>();
+               }
+
+               int numberOfInputs = inputs.length;
+               CommonContext commonContext = new CommonContext(numberOfInputs);
+               return IntStream.range(0, numberOfInputs)
+                       .mapToObj(
+                               idx -> {
+                                       try {
+                                       KeyAndValueSerializer<Object> 
keyAndValueSerializer = new KeyAndValueSerializer<>(

Review comment:
       Indentation seems off here. (Another point for having an automatic 
formatting tool... 😅)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to