pnowojski commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r422021656
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -477,14 +477,18 @@ protected void beforeInvoke() throws Exception { // output can request more floating buffers from global firstly. InputGate[] inputGates = getEnvironment().getAllInputGates(); if (inputGates != null) { - for (InputGate inputGate : inputGates) { - inputGate.readRecoveredState(channelIOExecutor, getEnvironment().getTaskStateManager().getChannelStateReader()); + CompletableFuture[] futures = new CompletableFuture[inputGates.length]; + for (int i = 0; i < inputGates.length; i++) { + futures[i] = inputGates[i].readRecoveredState( + channelIOExecutor, getEnvironment().getTaskStateManager().getChannelStateReader()); } - // Note that we must request partition after all the single gate finishes recovery. - for (InputGate inputGate : inputGates) { - inputGate.requestPartitions(channelIOExecutor); - } + // Note that we must request partition after all the single gates finished recovery. + CompletableFuture.allOf(futures).thenRun(ThrowingRunnable.unchecked(() -> { + for (InputGate inputGate : inputGates) { + inputGate.requestPartitions(); + } + })); Review comment: Hmmm, this is a bit fragile, as it implicitly assumes futures are completed from the task thread? Maybe add a `checkState(...)` asserting a thread to document this assumption? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) { @Override boolean isReleased() { - return isReleased; + return isReleased.get(); } void releaseAllResources() throws IOException { - ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>(); - synchronized (receivedBuffers) { - releasedBuffers.addAll(receivedBuffers); - receivedBuffers.clear(); - isReleased = true; + if (isReleased.compareAndSet(false, true)) { Review comment: Why atomic? It could easily be checked and set under `synchronized (receivedBuffers)` lock, simplifying threading model a bit and it would also avoid extra `AtomicBoolean` check on the hot path. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java ########## @@ -46,15 +49,19 @@ int maxBackoff, InputChannelMetrics metrics) { super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter()); + + bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0); } + public abstract InputChannel toInputChannel() throws IOException; + protected void readRecoveredState(ChannelStateReader reader) throws IOException, InterruptedException { ReadResult result = ReadResult.HAS_MORE_DATA; while (result == ReadResult.HAS_MORE_DATA) { - Buffer buffer = getBufferManager().requestBufferBlocking(); + Buffer buffer = bufferManager.requestBufferBlocking(); result = internalReaderRecoveredState(reader, buffer); } - getBufferManager().releaseFloatingBuffers(); + bufferManager.releaseFloatingBuffers(); Review comment: Maybe add ``` LOG.debug("{}/{} Finished recovering input.", inputGate.getOwningTaskName(), channelInfo); ``` ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ########## @@ -63,6 +63,8 @@ protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) { } protected boolean getNextRecord(T target) throws IOException, InterruptedException { + inputGate.requestPartitions(); Review comment: Could you explain why do you think: > also not sensitive for batch code path. ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org