aljoscha commented on a change in pull request #13529: URL: https://github.com/apache/flink/pull/13529#discussion_r501619194
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards + * the inputs are being merged in order. It is done, by reporting availability only for the input which is + * current head for the sorted inputs. + */ +public class MultiInputSortingDataInputs<K> { + private final StreamTaskInput<?>[] inputs; + private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] sorters; + private final PriorityQueue<HeadElement> queueOfHeads = new PriorityQueue<>(); + private final KeySelector<Object, K>[] keySelectors; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput(); + private final long[] seenWatermarks; + private final AvailabilityProvider.AvailabilityHelper allFinished = new AvailabilityProvider.AvailabilityHelper(); + + private long notFinishedSortingMask = 0; + private long finishedEmitting = 0; + + @SuppressWarnings("unchecked") + public MultiInputSortingDataInputs( + StreamTaskInput<Object>[] inputs, + KeySelector<Object, K>[] keySelectors, + TypeSerializer<Object>[] inputSerializers, + TypeSerializer<K> keySerializer, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration jobConfiguration, + AbstractInvokable containingTask) { + this.inputs = inputs; + this.keySelectors = keySelectors; + this.keySerializer = checkNotNull(keySerializer); + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + int numberOfInputs = inputs.length; + sorters = new PushSorter[numberOfInputs]; + seenWatermarks = new long[numberOfInputs]; + Arrays.fill(seenWatermarks, Long.MIN_VALUE); + for (int i = 0; i < numberOfInputs; i++) { + notFinishedSortingMask = setBitMask(notFinishedSortingMask, i); + KeyAndValueSerializer<Object> keyAndValueSerializer = new KeyAndValueSerializer<>( + inputSerializers[i], + keyLength); + try { + sorters[i] = ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator) + .memoryFraction(managedMemoryFraction / numberOfInputs) + .enableSpilling( + ioManager, + jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / numberOfInputs) + .objectReuse(objectReuse) + .largeRecords(true) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(); + } + } + } + + public <IN> StreamTaskInput<IN> getInput(int idx) { + return new IndexedInput<>(idx); + } + + /** + * A thin wrapper that represents a head of a sorted input. Additionally it keeps the id of + * the input it belongs to. + * + * <p>The class is mutable and we only ever have a single instance per input. + */ + private static final class HeadElement implements Comparable<HeadElement> { + final int inputIndex; + Tuple2<byte[], StreamRecord<Object>> streamElement; + + private HeadElement(int inputIndex) { + this.inputIndex = inputIndex; + } + + @Override + public int compareTo(HeadElement o) { + int keyCmp = compare(streamElement.f0, o.streamElement.f0); + if (keyCmp != 0) { + return keyCmp; + } + return Long.compare( + streamElement.f1.asRecord().getTimestamp(), + o.streamElement.f1.asRecord().getTimestamp()); + } + + private int compare(byte[] first, byte[] second) { + int firstLength = first.length; + int secondLength = second.length; + int minLength = Math.min(firstLength, secondLength); + for (int i = 0; i < minLength; i++) { + int cmp = Byte.compare(first[i], second[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(firstLength, secondLength); + } + } + + /** + * An input that wraps an underlying input and sorts the incoming records. It starts emitting records + * downstream only when all the other inputs coupled with this {@link MultiInputSortingDataInputs} have + * finished sorting as well. + * + * <p>Moreover it will report it is {@link #isAvailable() available} or + * {@link #isApproximatelyAvailable() approximately available} if it has some records pending only if the head + * of the {@link #queueOfHeads} belongs to the input. That way there is only ever one input that reports it is + * available. + */ + private class IndexedInput<IN> implements StreamTaskInput<IN> { + + private final int idx; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<Object>>> sortedInput; + + private IndexedInput(int idx) { + this.idx = idx; + } + + @Override + public int getInputIndex() { + return idx; + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) { + throw new UnsupportedOperationException(); Review comment: Again, not sure I caught this on the one-input version but we should give some information about why it's not supported. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards Review comment: I forget if we mentioned this in the one-input implementations but we should mention that this does sorting by key and timestamp (in that order). It's not some generic sorting utility. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards + * the inputs are being merged in order. It is done, by reporting availability only for the input which is + * current head for the sorted inputs. + */ +public class MultiInputSortingDataInputs<K> { + private final StreamTaskInput<?>[] inputs; + private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] sorters; + private final PriorityQueue<HeadElement> queueOfHeads = new PriorityQueue<>(); + private final KeySelector<Object, K>[] keySelectors; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput(); + private final long[] seenWatermarks; + private final AvailabilityProvider.AvailabilityHelper allFinished = new AvailabilityProvider.AvailabilityHelper(); + + private long notFinishedSortingMask = 0; + private long finishedEmitting = 0; + + @SuppressWarnings("unchecked") + public MultiInputSortingDataInputs( + StreamTaskInput<Object>[] inputs, + KeySelector<Object, K>[] keySelectors, + TypeSerializer<Object>[] inputSerializers, + TypeSerializer<K> keySerializer, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration jobConfiguration, + AbstractInvokable containingTask) { + this.inputs = inputs; + this.keySelectors = keySelectors; + this.keySerializer = checkNotNull(keySerializer); + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + int numberOfInputs = inputs.length; + sorters = new PushSorter[numberOfInputs]; + seenWatermarks = new long[numberOfInputs]; + Arrays.fill(seenWatermarks, Long.MIN_VALUE); + for (int i = 0; i < numberOfInputs; i++) { + notFinishedSortingMask = setBitMask(notFinishedSortingMask, i); + KeyAndValueSerializer<Object> keyAndValueSerializer = new KeyAndValueSerializer<>( + inputSerializers[i], + keyLength); + try { + sorters[i] = ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator) + .memoryFraction(managedMemoryFraction / numberOfInputs) + .enableSpilling( + ioManager, + jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / numberOfInputs) + .objectReuse(objectReuse) + .largeRecords(true) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(); + } + } + } + + public <IN> StreamTaskInput<IN> getInput(int idx) { + return new IndexedInput<>(idx); + } + + /** + * A thin wrapper that represents a head of a sorted input. Additionally it keeps the id of + * the input it belongs to. + * + * <p>The class is mutable and we only ever have a single instance per input. + */ + private static final class HeadElement implements Comparable<HeadElement> { + final int inputIndex; + Tuple2<byte[], StreamRecord<Object>> streamElement; + + private HeadElement(int inputIndex) { + this.inputIndex = inputIndex; + } + + @Override + public int compareTo(HeadElement o) { + int keyCmp = compare(streamElement.f0, o.streamElement.f0); + if (keyCmp != 0) { + return keyCmp; + } + return Long.compare( + streamElement.f1.asRecord().getTimestamp(), + o.streamElement.f1.asRecord().getTimestamp()); + } + + private int compare(byte[] first, byte[] second) { + int firstLength = first.length; + int secondLength = second.length; + int minLength = Math.min(firstLength, secondLength); + for (int i = 0; i < minLength; i++) { + int cmp = Byte.compare(first[i], second[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(firstLength, secondLength); + } + } + + /** + * An input that wraps an underlying input and sorts the incoming records. It starts emitting records + * downstream only when all the other inputs coupled with this {@link MultiInputSortingDataInputs} have + * finished sorting as well. + * + * <p>Moreover it will report it is {@link #isAvailable() available} or + * {@link #isApproximatelyAvailable() approximately available} if it has some records pending only if the head + * of the {@link #queueOfHeads} belongs to the input. That way there is only ever one input that reports it is + * available. + */ + private class IndexedInput<IN> implements StreamTaskInput<IN> { + + private final int idx; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<Object>>> sortedInput; + + private IndexedInput(int idx) { + this.idx = idx; + } + + @Override + public int getInputIndex() { + return idx; + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + IOException ex = null; + try { + inputs[idx].close(); Review comment: Instead of using array accesses for `inputs` and `sorters`, could we just get them once in the constructor and store in fields? This is related to the question below about using one `SortingPhaseDataOutput` per input. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards + * the inputs are being merged in order. It is done, by reporting availability only for the input which is + * current head for the sorted inputs. + */ +public class MultiInputSortingDataInputs<K> { + private final StreamTaskInput<?>[] inputs; + private final PushSorter<Tuple2<byte[], StreamRecord<Object>>>[] sorters; + private final PriorityQueue<HeadElement> queueOfHeads = new PriorityQueue<>(); + private final KeySelector<Object, K>[] keySelectors; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + private final SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput(); + private final long[] seenWatermarks; + private final AvailabilityProvider.AvailabilityHelper allFinished = new AvailabilityProvider.AvailabilityHelper(); + + private long notFinishedSortingMask = 0; + private long finishedEmitting = 0; + + @SuppressWarnings("unchecked") + public MultiInputSortingDataInputs( + StreamTaskInput<Object>[] inputs, + KeySelector<Object, K>[] keySelectors, + TypeSerializer<Object>[] inputSerializers, + TypeSerializer<K> keySerializer, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration jobConfiguration, + AbstractInvokable containingTask) { + this.inputs = inputs; + this.keySelectors = keySelectors; + this.keySerializer = checkNotNull(keySerializer); + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator; + if (keyLength > 0) { + this.dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + this.dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + int numberOfInputs = inputs.length; + sorters = new PushSorter[numberOfInputs]; + seenWatermarks = new long[numberOfInputs]; + Arrays.fill(seenWatermarks, Long.MIN_VALUE); + for (int i = 0; i < numberOfInputs; i++) { + notFinishedSortingMask = setBitMask(notFinishedSortingMask, i); + KeyAndValueSerializer<Object> keyAndValueSerializer = new KeyAndValueSerializer<>( + inputSerializers[i], + keyLength); + try { + sorters[i] = ExternalSorter.newBuilder( + memoryManager, + containingTask, + keyAndValueSerializer, + comparator) + .memoryFraction(managedMemoryFraction / numberOfInputs) + .enableSpilling( + ioManager, + jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD)) + .maxNumFileHandles(jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN) / numberOfInputs) + .objectReuse(objectReuse) + .largeRecords(true) + .build(); + } catch (MemoryAllocationException e) { + throw new RuntimeException(); + } + } + } + + public <IN> StreamTaskInput<IN> getInput(int idx) { + return new IndexedInput<>(idx); + } + + /** + * A thin wrapper that represents a head of a sorted input. Additionally it keeps the id of + * the input it belongs to. + * + * <p>The class is mutable and we only ever have a single instance per input. + */ + private static final class HeadElement implements Comparable<HeadElement> { + final int inputIndex; + Tuple2<byte[], StreamRecord<Object>> streamElement; + + private HeadElement(int inputIndex) { + this.inputIndex = inputIndex; + } + + @Override + public int compareTo(HeadElement o) { + int keyCmp = compare(streamElement.f0, o.streamElement.f0); + if (keyCmp != 0) { + return keyCmp; + } + return Long.compare( + streamElement.f1.asRecord().getTimestamp(), + o.streamElement.f1.asRecord().getTimestamp()); + } + + private int compare(byte[] first, byte[] second) { + int firstLength = first.length; + int secondLength = second.length; + int minLength = Math.min(firstLength, secondLength); + for (int i = 0; i < minLength; i++) { + int cmp = Byte.compare(first[i], second[i]); + + if (cmp != 0) { + return cmp; + } + } + + return Integer.compare(firstLength, secondLength); + } + } + + /** + * An input that wraps an underlying input and sorts the incoming records. It starts emitting records + * downstream only when all the other inputs coupled with this {@link MultiInputSortingDataInputs} have + * finished sorting as well. + * + * <p>Moreover it will report it is {@link #isAvailable() available} or + * {@link #isApproximatelyAvailable() approximately available} if it has some records pending only if the head + * of the {@link #queueOfHeads} belongs to the input. That way there is only ever one input that reports it is + * available. + */ + private class IndexedInput<IN> implements StreamTaskInput<IN> { + + private final int idx; + private MutableObjectIterator<Tuple2<byte[], StreamRecord<Object>>> sortedInput; + + private IndexedInput(int idx) { + this.idx = idx; + } + + @Override + public int getInputIndex() { + return idx; + } + + @Override + public CompletableFuture<Void> prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + IOException ex = null; + try { + inputs[idx].close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + try { + sorters[idx].close(); + } catch (IOException e) { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + + if (ex != null) { + throw ex; + } + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public InputStatus emitNext(DataOutput<IN> output) throws Exception { + if (sortedInput != null) { + return emitNextAfterSorting(output); + } + + sortingPhaseDataOutput.currentIdx = idx; + InputStatus inputStatus = inputs[idx].emitNext((DataOutput) sortingPhaseDataOutput); + if (inputStatus == InputStatus.END_OF_INPUT) { + endSorting(); + return addNextToQueue(new HeadElement(idx), output); + } + + return inputStatus; + } + + @Nonnull + @SuppressWarnings({"unchecked"}) + private InputStatus emitNextAfterSorting(DataOutput<IN> output) throws Exception { + if (checkBitMask(finishedEmitting, idx)) { + return InputStatus.END_OF_INPUT; + } else if (allSorted()) { + HeadElement head = queueOfHeads.peek(); + if (head != null && head.inputIndex == idx) { + HeadElement headElement = queueOfHeads.poll(); + output.emitRecord((StreamRecord<IN>) headElement.streamElement.f1); + return addNextToQueue(headElement, output); + } else { + return InputStatus.NOTHING_AVAILABLE; + } + } else { + return InputStatus.NOTHING_AVAILABLE; + } + } + + private void endSorting() throws Exception { + sorters[idx].finishReading(); + notFinishedSortingMask = unsetBitMask(notFinishedSortingMask, idx); + sortedInput = sorters[idx].getIterator(); + if (allSorted()) { + allFinished.getUnavailableToResetAvailable().complete(null); + } + } + + @Nonnull + private InputStatus addNextToQueue(HeadElement reuse, DataOutput<IN> output) throws Exception { + Tuple2<byte[], StreamRecord<Object>> next = sortedInput.next(); + if (next != null) { + reuse.streamElement = next; + queueOfHeads.add(reuse); + } else { + finishedEmitting = setBitMask(finishedEmitting, idx); + if (seenWatermarks[idx] > Long.MIN_VALUE) { + output.emitWatermark(new Watermark(seenWatermarks[idx])); + } + return InputStatus.END_OF_INPUT; + } + + if (allSorted()) { + HeadElement headElement = queueOfHeads.peek(); + if (headElement != null) { + if (headElement.inputIndex == idx) { + return InputStatus.MORE_AVAILABLE; + } + } + } + + return InputStatus.NOTHING_AVAILABLE; + } + + @Override + public boolean isApproximatelyAvailable() { + if (sortedInput != null) { + return isHeadAvailable(); + } else { + return StreamTaskInput.super.isApproximatelyAvailable(); + } + } + + @Override + public boolean isAvailable() { + if (sortedInput != null) { + return isHeadAvailable(); + } else { + return StreamTaskInput.super.isAvailable(); + } + } + + private boolean isHeadAvailable() { + if (!allSorted()) { + return false; + } + HeadElement headElement = queueOfHeads.peek(); + return headElement != null && headElement.inputIndex == idx; + } + + @Override + public CompletableFuture<?> getAvailableFuture() { + if (sortedInput != null) { + return allFinished.getAvailableFuture(); + } else { + return inputs[idx].getAvailableFuture(); + } + } + } + + /** + * A simple {@link PushingAsyncDataInput.DataOutput} used in the sorting phase when we have not seen all the + * records from the underlying input yet. It forwards the records to a corresponding sorter. + */ + private class SortingPhaseDataOutput implements PushingAsyncDataInput.DataOutput<Object> { + + int currentIdx; Review comment: Would it maybe be better to have one `SortingPhaseDataOutput` per indexed input. It could also have "hard links" to the things it needs like key selector, sorter, etc. And then we also wouldn't need this mutable field. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +/** + * An input that wraps an underlying input and sorts the incoming records. It starts emitting records + * downstream only when all the other inputs coupled with this {@link MultiInputSortingDataInput} have + * finished sorting as well. + * + * <p>Moreover it will report it is {@link #isAvailable() available} or + * {@link #isApproximatelyAvailable() approximately available} if it has some records pending only if the head + * of the {@link CommonContext#getQueueOfHeads()} belongs to the input. That way there is only ever one input + * that reports it is available. + * + * <p>The sorter uses binary comparison of keys, which are extracted and serialized when received + * from the chained input. Moreover the timestamps of incoming records are used for secondary ordering. + * For the comparison it uses either {@link FixedLengthByteKeyComparator} if the length of the + * serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise. + * + * <p>Watermarks, stream statuses, nor latency markers are propagated downstream as they do not make + * sense with buffered records. The input emits a MAX_WATERMARK after all records. Review comment: It emits the maximum watermark it has seen, MAX_WATERMARK could be interpreted as `Long.MAX_VALUE`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; +import org.apache.flink.runtime.io.AvailabilityProvider; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.io.StreamTaskInput; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MutableObjectIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.PriorityQueue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +/** + * An input that wraps an underlying input and sorts the incoming records. It starts emitting records + * downstream only when all the other inputs coupled with this {@link MultiInputSortingDataInput} have + * finished sorting as well. + * + * <p>Moreover it will report it is {@link #isAvailable() available} or + * {@link #isApproximatelyAvailable() approximately available} if it has some records pending only if the head + * of the {@link CommonContext#getQueueOfHeads()} belongs to the input. That way there is only ever one input + * that reports it is available. + * + * <p>The sorter uses binary comparison of keys, which are extracted and serialized when received + * from the chained input. Moreover the timestamps of incoming records are used for secondary ordering. + * For the comparison it uses either {@link FixedLengthByteKeyComparator} if the length of the + * serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise. + * + * <p>Watermarks, stream statuses, nor latency markers are propagated downstream as they do not make + * sense with buffered records. The input emits a MAX_WATERMARK after all records. + */ +public final class MultiInputSortingDataInput<IN, K> implements StreamTaskInput<IN> { + private final int idx; + private final StreamTaskInput<IN> wrappedInput; + private final PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter; + private final CommonContext commonContext; + private final SortingPhaseDataOutput sortingPhaseDataOutput = new SortingPhaseDataOutput(); + + private final KeySelector<IN, K> keySelector; + private final TypeSerializer<K> keySerializer; + private final DataOutputSerializer dataOutputSerializer; + + private MutableObjectIterator<Tuple2<byte[], StreamRecord<IN>>> sortedInput; + private long seenWatermark = Long.MIN_VALUE; + + private MultiInputSortingDataInput( + CommonContext commonContext, + StreamTaskInput<IN> wrappedInput, + int inputIdx, + PushSorter<Tuple2<byte[], StreamRecord<IN>>> sorter, + KeySelector<IN, K> keySelector, + TypeSerializer<K> keySerializer, + DataOutputSerializer dataOutputSerializer) { + this.wrappedInput = wrappedInput; + this.idx = inputIdx; + this.commonContext = commonContext; + this.sorter = sorter; + this.keySelector = keySelector; + this.keySerializer = keySerializer; + this.dataOutputSerializer = dataOutputSerializer; + } + + public static <K> StreamTaskInput<?>[] wrapInputs( + AbstractInvokable containingTask, + StreamTaskInput<Object>[] inputs, + KeySelector<Object, K>[] keySelectors, + TypeSerializer<Object>[] inputSerializers, + TypeSerializer<K> keySerializer, + MemoryManager memoryManager, + IOManager ioManager, + boolean objectReuse, + double managedMemoryFraction, + Configuration jobConfiguration) { + int keyLength = keySerializer.getLength(); + final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator; + DataOutputSerializer dataOutputSerializer; + if (keyLength > 0) { + dataOutputSerializer = new DataOutputSerializer(keyLength); + comparator = new FixedLengthByteKeyComparator<>(keyLength); + } else { + dataOutputSerializer = new DataOutputSerializer(64); + comparator = new VariableLengthByteKeyComparator<>(); + } + + int numberOfInputs = inputs.length; + CommonContext commonContext = new CommonContext(numberOfInputs); + return IntStream.range(0, numberOfInputs) + .mapToObj( + idx -> { + try { + KeyAndValueSerializer<Object> keyAndValueSerializer = new KeyAndValueSerializer<>( Review comment: Indentation seems off here. (Another point for having an automatic formatting tool... 😅) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org