mxm commented on code in PR #20485: URL: https://github.com/apache/flink/pull/20485#discussion_r952615289
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java: ########## @@ -0,0 +1,144 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.mocks.MockSourceReader; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; +import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; + +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for split alignment in {@link SourceOperator}. */ +public class SourceOperatorSplitWatermarkAlignmentTest { + public static final WatermarkGenerator<Integer> WATERMARK_GENERATOR = + new WatermarkGenerator<Integer>() { + + private long maxWatermark = Long.MIN_VALUE; + + @Override + public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) { + if (eventTimestamp > maxWatermark) { + this.maxWatermark = eventTimestamp; + output.emitWatermark(new Watermark(maxWatermark)); + } + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + output.emitWatermark(new Watermark(maxWatermark)); + } + }; + + @Test + public void testSplitWatermarkAlignment() throws Exception { + + final SplitAligningSourceReader sourceReader = new SplitAligningSourceReader(); + SourceOperator<Integer, MockSourceSplit> operator = + new TestingSourceOperator<>( + sourceReader, + WatermarkStrategy.forGenerator(ctx -> WATERMARK_GENERATOR) + .withTimestampAssigner((r, l) -> r) + .withWatermarkAlignment("group-1", Duration.ofMillis(1)), + new TestProcessingTimeService(), + new MockOperatorEventGateway(), + 1, + 5, + true); + Environment env = getTestingEnvironment(); + operator.setup( + new SourceOperatorStreamTask<Integer>(env), + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>())); + operator.initializeState(new StreamTaskStateInitializerImpl(env, new MemoryStateBackend())); + + operator.open(); + final MockSourceSplit split1 = new MockSourceSplit(0, 0, 10); + final MockSourceSplit split2 = new MockSourceSplit(1, 10, 20); + split1.addRecord(5); + split1.addRecord(6); + split1.addRecord(11); + operator.handleOperatorEvent( + new AddSplitEvent<>( + Arrays.asList(split1, split2), new MockSourceSplitSerializer())); + final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>(); + + operator.emitNext(dataOutput); // 5 + operator.handleOperatorEvent( + new WatermarkAlignmentEvent(4)); // pause by coordinator message + assertThat(sourceReader.pausedSplits).containsExactly("0"); + operator.handleOperatorEvent(new WatermarkAlignmentEvent(5)); + assertThat(sourceReader.pausedSplits).isEmpty(); + operator.emitNext(dataOutput); // 6, pause by increasing watermark + assertThat(sourceReader.pausedSplits).containsExactly("0"); + } Review Comment: Should we test resuming as well? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.fetcher; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Changes the paused splits of a n{@link SplitReader}. The task is used by default in {@link + * SplitFetcherManager} and assumes that a {@link SplitFetcher} has multiple splits. For {@code + * SplitFetchers} with single splits, it's instead recommended to subclass {@link + * SplitFetcherManager} and pause the whole {@code SplitFetcher}. + * + * @param <SplitT> the type of the split + */ +@Internal +class PauseOrResumeSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask { + private static final Logger LOG = LoggerFactory.getLogger(PauseOrResumeSplitsTask.class); + private final SplitReader<?, SplitT> splitReader; + private final Collection<SplitT> splitsToPause; + private final Collection<SplitT> splitsToResume; + private final boolean allowUnalignedSourceSplits; + + PauseOrResumeSplitsTask( + SplitReader<?, SplitT> splitReader, + Collection<SplitT> splitsToPause, + Collection<SplitT> splitsToResume, + boolean allowUnalignedSourceSplits) { + this.splitReader = checkNotNull(splitReader); + this.splitsToPause = checkNotNull(splitsToPause); + this.splitsToResume = checkNotNull(splitsToResume); + this.allowUnalignedSourceSplits = allowUnalignedSourceSplits; + } + + @Override + public boolean run() throws IOException { + try { + splitReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + } catch (UnsupportedOperationException e) { + if (!allowUnalignedSourceSplits) { + throw new UnsupportedOperationException("", e); Review Comment: +1 ########## flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java: ########## @@ -227,6 +227,20 @@ public void close() throws Exception { consumer.close(); } + @Override + public void pauseOrResumeSplits( + Collection<KafkaPartitionSplit> splitsToPause, + Collection<KafkaPartitionSplit> splitsToResume) { + consumer.resume( + splitsToResume.stream() + .map(KafkaPartitionSplit::getTopicPartition) + .collect(Collectors.toList())); + consumer.pause( + splitsToPause.stream() + .map(KafkaPartitionSplit::getTopicPartition) + .collect(Collectors.toList())); Review Comment: Should the order be the other way around, in case resuming blocks for any reason? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java: ########## @@ -171,17 +230,54 @@ void runOnce() { * @param splitsToAdd the splits to add. */ public void addSplits(List<SplitT> splitsToAdd) { - enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); - wakeUp(true); + lock.lock(); + try { + enqueueTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); + wakeUpUnsafe(true); + } finally { + lock.unlock(); + } + } + + /** + * Called when some splits of this source instance progressed too much beyond the global + * watermark of all subtasks. If the split reader implements {@link SplitReader}, it will relay + * the information asynchronously through the split fetcher thread. + * + * @param splitsToPause the splits to pause + * @param splitsToResume the splits to resume + */ + public void pauseOrResumeSplits( + Collection<SplitT> splitsToPause, Collection<SplitT> splitsToResume) { + lock.lock(); + try { + enqueueTaskUnsafe( + new PauseOrResumeSplitsTask<>( + splitReader, + splitsToPause, + splitsToResume, + allowUnalignedSourceSplits)); + wakeUpUnsafe(true); + } finally { + lock.unlock(); + } } public void enqueueTask(SplitFetcherTask task) { - synchronized (lock) { - taskQueue.offer(task); - isIdle = false; + lock.lock(); + try { + enqueueTaskUnsafe(task); + } finally { + lock.unlock(); } } + private void enqueueTaskUnsafe(SplitFetcherTask task) { + assert lock.isHeldByCurrentThread(); Review Comment: Since this is an ReentrantLock, we could also lock here which would make this method safer. The assert won't run in production. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); + } + } + + /** + * Finds the splits that are beyond the current max watermark and pauses them. At the same time, + * splits that have been paused and where the global watermark caught up are resumed. + * + * <p>Note: This takes effect only if there are multiple splits, otherwise it does nothing. + */ + private void checkSplitWatermarkAlignment() { + if (numSplits <= 1) { + return; // If there is only a single split, we do not pause the split but the source. + } + Collection<String> splitsToPause = new ArrayList<>(); + Collection<String> splitsToResume = new ArrayList<>(); + splitCurrentWatermarks.forEach( + (splitId, splitWatermark) -> { + if (splitWatermark > currentMaxDesiredWatermark) { + splitsToPause.add(splitId); + } else if (currentlyPausedSplits.contains(splitId)) { + splitsToResume.add(splitId); + } + }); + splitsToPause.removeAll(currentlyPausedSplits); + if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) { + pauseOrResumeSplits(splitsToPause, splitsToResume); + currentlyPausedSplits.addAll(splitsToPause); + splitsToResume.forEach(currentlyPausedSplits::remove); + } + } + + private void pauseOrResumeSplits( + Collection<String> splitsToPause, Collection<String> splitsToResume) { + if (!allowUnalignedSourceSplits || sourceReaderSupportsPauseOrResumeSplits) { + try { + sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + } catch (UnsupportedOperationException e) { + sourceReaderSupportsPauseOrResumeSplits = false; + if (!allowUnalignedSourceSplits) { + throw new UnsupportedOperationException("", e); Review Comment: +1 ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java: ########## @@ -179,6 +180,21 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges this.pulsarConsumer = consumer; } + @Override + public void pauseOrResumeSplits( + Collection<PulsarPartitionSplit> splitsToPause, + Collection<PulsarPartitionSplit> splitsToResume) { + if (splitsToPause.size() > 1 || splitsToResume.size() > 1) { + throw new IllegalStateException("This pulsar split reader only support one split."); Review Comment: I don't understand why we can't pause single splits. Even if so, I don't know why that should leak to the Pulsar reader. ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java: ########## @@ -210,138 +313,73 @@ Map<String, SplitT> assignedSplits() { /** * Package private for unit test. * - * @return true if task queue is not empty, false otherwise. + * @return true if task queue is empty, false otherwise. */ boolean isIdle() { - return isIdle; - } - - /** - * Check whether the fetch task should run. The fetch task should only run when all the - * following conditions are met. 1. there is no task in the task queue to run. 2. there are - * assigned splits Package private for testing purpose. - * - * @return whether the fetch task should be run. - */ - boolean shouldRunFetchTask() { - return taskQueue.isEmpty() && !assignedSplits.isEmpty(); + lock.lock(); + try { + return assignedSplits.isEmpty() && taskQueue.isEmpty() && runningTask == null; + } finally { + lock.unlock(); + } } /** * Wake up the fetcher thread. There are only two blocking points in a running fetcher. 1. - * Taking the next task out of the task queue. 2. Running a task. + * Waiting for the next task in an idle fetcher. 2. Running a task. * * <p>They need to be waken up differently. If the fetcher is blocking waiting on the next task - * in the task queue, we should just interrupt the fetcher thread. If the fetcher is running the - * user split reader, we should call SplitReader.wakeUp() instead of naively interrupt the - * thread. - * - * <p>The correctness can be think of in the following way. The purpose of wake up is to let the - * fetcher thread go to the very beginning of the running loop. There are three major events in - * each run of the loop. - * - * <ol> - * <li>pick a task (blocking) - * <li>assign the task to runningTask variable. - * <li>run the runningTask. (blocking) - * </ol> - * - * <p>We don't need to worry about things after step 3 because there is no blocking point - * anymore. - * - * <p>We always first set the wakeup flag when waking up the fetcher, then use the value of - * running task to determine where the fetcher thread is. - * - * <ul> - * <li>If runningThread is null, it is before step 2, so we should interrupt fetcher. This - * interruption will not be propagated to the split reader, because the wakeUp flag will - * prevent the fetchTask from running. - * <li>If runningThread is not null, it is after step 2. so we should wakeUp the split reader - * instead of interrupt the fetcher. - * </ul> + * in the task queue, we should just notify that a task is available. If the fetcher is running + * the user split reader, we should call SplitReader.wakeUp() instead. * - * <p>The above logic only works in the same {@link #runOnce()} invocation. So we need to - * synchronize to ensure the wake up logic do not touch a different invocation. + * <p>The correctness can be thought of in the following way. The purpose of wake up is to let + * the fetcher thread go to the very beginning of the running loop. */ void wakeUp(boolean taskOnly) { // Synchronize to make sure the wake up only works for the current invocation of runOnce(). - synchronized (wakeUp) { - // Do not wake up repeatedly. - wakeUp.set(true); - // Now the wakeUp flag is set. - SplitFetcherTask currentTask = runningTask; - if (isRunningTask(currentTask)) { - // The running task may have missed our wakeUp flag and running, wake it up. - LOG.debug("Waking up running task {}", currentTask); - currentTask.wakeUp(); - } else if (!taskOnly) { - // The task has not started running yet, and it will not run for this - // runOnce() invocation due to the wakeUp flag. But we might have to - // wake up the fetcher thread in case it is blocking on the task queue. - // Only wake up when the thread has started and there is no running task. - LOG.debug("Waking up fetcher thread."); - taskQueue.add(WAKEUP_TASK); - } + lock.lock(); + try { + wakeUpUnsafe(taskOnly); + } finally { + lock.unlock(); } } - private void maybeEnqueueTask(SplitFetcherTask task) { - // Only enqueue unfinished non-fetch task. - if (!closed.get() - && isRunningTask(task) - && task != fetchTask - && !taskQueue.offerFirst(task)) { - throw new RuntimeException( - "The task queue is full. This is only theoretically possible when really bad thing happens."); - } - if (task != null) { - LOG.debug("Enqueued task {}", task); + private void wakeUpUnsafe(boolean taskOnly) { + assert lock.isHeldByCurrentThread(); Review Comment: Same here, locking would be an option because of ReentrantLock. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); + } + } + + /** + * Finds the splits that are beyond the current max watermark and pauses them. At the same time, + * splits that have been paused and where the global watermark caught up are resumed. + * + * <p>Note: This takes effect only if there are multiple splits, otherwise it does nothing. + */ + private void checkSplitWatermarkAlignment() { + if (numSplits <= 1) { + return; // If there is only a single split, we do not pause the split but the source. + } + Collection<String> splitsToPause = new ArrayList<>(); + Collection<String> splitsToResume = new ArrayList<>(); + splitCurrentWatermarks.forEach( + (splitId, splitWatermark) -> { + if (splitWatermark > currentMaxDesiredWatermark) { + splitsToPause.add(splitId); + } else if (currentlyPausedSplits.contains(splitId)) { + splitsToResume.add(splitId); + } + }); + splitsToPause.removeAll(currentlyPausedSplits); + if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) { + pauseOrResumeSplits(splitsToPause, splitsToResume); + currentlyPausedSplits.addAll(splitsToPause); + splitsToResume.forEach(currentlyPausedSplits::remove); + } + } + + private void pauseOrResumeSplits( + Collection<String> splitsToPause, Collection<String> splitsToResume) { + if (!allowUnalignedSourceSplits || sourceReaderSupportsPauseOrResumeSplits) { + try { + sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + } catch (UnsupportedOperationException e) { + sourceReaderSupportsPauseOrResumeSplits = false; + if (!allowUnalignedSourceSplits) { + throw new UnsupportedOperationException("", e); Review Comment: In my understanding, readers which do not support pausing splits will fail the job, unless the configuration to allow unaligned splits is set. Hence, I don't understand the `sourceReaderSupportsPauseOrResumeSplits` flag. I suggest the following: ```suggestion try { sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume); } catch (UnsupportedOperationException e) { sourceReaderSupportsPauseOrResumeSplits = false; if (!allowUnalignedSourceSplits) { throw e; ``` ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); + } + } + + /** + * Finds the splits that are beyond the current max watermark and pauses them. At the same time, + * splits that have been paused and where the global watermark caught up are resumed. + * + * <p>Note: This takes effect only if there are multiple splits, otherwise it does nothing. + */ + private void checkSplitWatermarkAlignment() { + if (numSplits <= 1) { + return; // If there is only a single split, we do not pause the split but the source. + } Review Comment: It would be great if we didn't have to special case a single split. I understand that the `SourceReaderBase` will stop polling in this case, but it would be beneficial if there was only one path. ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java: ########## @@ -111,57 +121,106 @@ public void run() { splitReader.close(); } catch (Exception e) { errorHandler.accept(e); + } finally { + LOG.info("Split fetcher {} exited.", id); + // This executes after possible errorHandler.accept(t). If these operations bear + // a happens-before relation, then we can checking side effect of + // errorHandler.accept(t) + // to know whether it happened after observing side effect of shutdownHook.run(). + shutdownHook.run(); } - LOG.info("Split fetcher {} exited.", id); - // This executes after possible errorHandler.accept(t). If these operations bear - // a happens-before relation, then we can checking side effect of errorHandler.accept(t) - // to know whether it happened after observing side effect of shutdownHook.run(). - shutdownHook.run(); } } /** Package private method to help unit test. */ - void runOnce() { + boolean runOnce() { + // first blocking call = get next task. blocks only if there are no active splits and queued + // tasks. + SplitFetcherTask task; + lock.lock(); try { - // The fetch task should run if the split assignment is not empty or there is a split - // change. - if (shouldRunFetchTask()) { - runningTask = fetchTask; - } else { - runningTask = taskQueue.take(); + if (closed) { + return false; } - // Now the running task is not null. If wakeUp() is called after this point, - // task.wakeUp() will be called. On the other hand, if the wakeUp() call was make before - // this point, the wakeUp flag must have already been set. The code hence checks the - // wakeUp - // flag first to avoid an unnecessary task run. - // Note that the runningTask may still encounter the case that the task is waken up - // before - // the it starts running. - LOG.debug("Prepare to run {}", runningTask); - if (!wakeUp.get() && runningTask.run()) { - LOG.debug("Finished running task {}", runningTask); - // the task has finished running. Set it to null so it won't be enqueued. - runningTask = null; - checkAndSetIdle(); + + task = getNextTaskUnsafe(); + if (task == null) { + // (spurious) wakeup, so just repeat + return true; } + + LOG.debug("Prepare to run {}", task); + // store task for #wakeUp + this.runningTask = task; + } finally { + lock.unlock(); + } + + // execute the task outside of lock, so that it can be woken up + boolean taskFinished; + try { + taskFinished = task.run(); } catch (Exception e) { throw new RuntimeException( String.format( "SplitFetcher thread %d received unexpected exception while polling the records", id), e); } - // If the task is not null that means this task needs to be re-executed. This only - // happens when the task is the fetching task or the task was interrupted. - maybeEnqueueTask(runningTask); - synchronized (wakeUp) { - // Set the running task to null. It is necessary for the shutdown method to avoid - // unnecessarily interrupt the running task. - runningTask = null; - // Set the wakeUp flag to false. - wakeUp.set(false); - LOG.debug("Cleaned wakeup flag."); + + // re-acquire lock as all post-processing steps, need it + lock.lock(); + try { + this.runningTask = null; + processTaskResultUnsafe(task, taskFinished); + } finally { + lock.unlock(); + } + return true; + } + + private void processTaskResultUnsafe(SplitFetcherTask task, boolean taskFinished) { + assert lock.isHeldByCurrentThread(); Review Comment: Same as for the other lock asserts. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); + } + } + + /** + * Finds the splits that are beyond the current max watermark and pauses them. At the same time, + * splits that have been paused and where the global watermark caught up are resumed. + * + * <p>Note: This takes effect only if there are multiple splits, otherwise it does nothing. + */ + private void checkSplitWatermarkAlignment() { + if (numSplits <= 1) { + return; // If there is only a single split, we do not pause the split but the source. + } + Collection<String> splitsToPause = new ArrayList<>(); + Collection<String> splitsToResume = new ArrayList<>(); + splitCurrentWatermarks.forEach( + (splitId, splitWatermark) -> { + if (splitWatermark > currentMaxDesiredWatermark) { + splitsToPause.add(splitId); + } else if (currentlyPausedSplits.contains(splitId)) { + splitsToResume.add(splitId); + } + }); + splitsToPause.removeAll(currentlyPausedSplits); + if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) { + pauseOrResumeSplits(splitsToPause, splitsToResume); + currentlyPausedSplits.addAll(splitsToPause); + splitsToResume.forEach(currentlyPausedSplits::remove); + } Review Comment: What if all splits are paused, do we signal this back to the user like when we only have a single split? ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java: ########## @@ -210,138 +313,73 @@ Map<String, SplitT> assignedSplits() { /** * Package private for unit test. * - * @return true if task queue is not empty, false otherwise. + * @return true if task queue is empty, false otherwise. */ boolean isIdle() { - return isIdle; - } - - /** - * Check whether the fetch task should run. The fetch task should only run when all the - * following conditions are met. 1. there is no task in the task queue to run. 2. there are - * assigned splits Package private for testing purpose. - * - * @return whether the fetch task should be run. - */ - boolean shouldRunFetchTask() { - return taskQueue.isEmpty() && !assignedSplits.isEmpty(); + lock.lock(); + try { + return assignedSplits.isEmpty() && taskQueue.isEmpty() && runningTask == null; + } finally { + lock.unlock(); + } } /** * Wake up the fetcher thread. There are only two blocking points in a running fetcher. 1. - * Taking the next task out of the task queue. 2. Running a task. + * Waiting for the next task in an idle fetcher. 2. Running a task. * * <p>They need to be waken up differently. If the fetcher is blocking waiting on the next task - * in the task queue, we should just interrupt the fetcher thread. If the fetcher is running the - * user split reader, we should call SplitReader.wakeUp() instead of naively interrupt the - * thread. - * - * <p>The correctness can be think of in the following way. The purpose of wake up is to let the - * fetcher thread go to the very beginning of the running loop. There are three major events in - * each run of the loop. - * - * <ol> - * <li>pick a task (blocking) - * <li>assign the task to runningTask variable. - * <li>run the runningTask. (blocking) - * </ol> - * - * <p>We don't need to worry about things after step 3 because there is no blocking point - * anymore. - * - * <p>We always first set the wakeup flag when waking up the fetcher, then use the value of - * running task to determine where the fetcher thread is. - * - * <ul> - * <li>If runningThread is null, it is before step 2, so we should interrupt fetcher. This - * interruption will not be propagated to the split reader, because the wakeUp flag will - * prevent the fetchTask from running. - * <li>If runningThread is not null, it is after step 2. so we should wakeUp the split reader - * instead of interrupt the fetcher. Review Comment: Do we want to explain how the new logic works? ########## flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java: ########## @@ -282,4 +282,21 @@ public enum VertexDescriptionMode { .withDescription( "Whether name of vertex includes topological index or not. " + "When it is true, the name will have a prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by default"); + + @PublicEvolving + public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS = + key("pipeline.watermark-alignment.allow-unaligned-source-splits") + .booleanType() + .defaultValue(false) Review Comment: Just a thought: Do we have to have another configuration option? I think we could just keep the default behavior prior adding this feature and not do split alignment. We could print a WARN when this happens. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); + } Review Comment: We should consider whether we need the eager pausing logic here. There is the periodic `checkSplitWatermarkAlignment` method which achieves the same. The default with one split was to pause at the alignment interval only, so maybe we should keep that behavior. ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.fetcher; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Changes the paused splits of a n{@link SplitReader}. The task is used by default in {@link Review Comment: "a n" typo? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -175,6 +188,9 @@ private enum OperatingMode { private @Nullable LatencyMarkerEmitter<OUT> latencyMarkerEmitter; + private final boolean allowUnalignedSourceSplits; + private boolean sourceReaderSupportsPauseOrResumeSplits = true; Review Comment: IMHO this flag is not needed. ```suggestion ``` ########## flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java: ########## @@ -171,17 +230,54 @@ void runOnce() { * @param splitsToAdd the splits to add. */ public void addSplits(List<SplitT> splitsToAdd) { - enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); - wakeUp(true); + lock.lock(); + try { + enqueueTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); + wakeUpUnsafe(true); + } finally { + lock.unlock(); + } + } + + /** + * Called when some splits of this source instance progressed too much beyond the global + * watermark of all subtasks. If the split reader implements {@link SplitReader}, it will relay + * the information asynchronously through the split fetcher thread. Review Comment: This JavaDoc leaks information about potential callers but does not explain what this method actually does. ########## flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java: ########## @@ -68,7 +69,7 @@ public KafkaSourceFetcherManager( Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>> splitReaderSupplier, Consumer<Collection<String>> splitFinishedHook) { - super(elementsQueue, splitReaderSupplier, splitFinishedHook); + super(elementsQueue, splitReaderSupplier, new Configuration(), splitFinishedHook); Review Comment: Why do we need pass in the correct configuration? ########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java: ########## @@ -179,6 +180,21 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges this.pulsarConsumer = consumer; } + @Override + public void pauseOrResumeSplits( + Collection<PulsarPartitionSplit> splitsToPause, + Collection<PulsarPartitionSplit> splitsToResume) { + if (splitsToPause.size() > 1 || splitsToResume.size() > 1) { + throw new IllegalStateException("This pulsar split reader only support one split."); Review Comment: ```suggestion throw new IllegalStateException("This pulsar split reader only supports one split."); ``` ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> newSplits) { private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) { currentMaxDesiredWatermark = event.getMaxWatermark(); + checkSplitWatermarkAlignment(); sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark); } - private void onWatermarkEmitted(long emittedWatermark) { - lastEmittedWatermark = emittedWatermark; + @Override + public void updateCurrentEffectiveWatermark(long watermark) { + lastEmittedWatermark = watermark; checkWatermarkAlignment(); } + @Override + public void updateCurrentSplitWatermark(String splitId, long watermark) { + splitCurrentWatermarks.put(splitId, watermark); + if (currentMaxDesiredWatermark < watermark && !currentlyPausedSplits.contains(splitId)) { + pauseOrResumeSplits(Collections.singletonList(splitId), Collections.emptyList()); + currentlyPausedSplits.add(splitId); Review Comment: Is this operation safe? We are adding new splits here while at the same time this set is cleared in ` checkSplitWatermarkAlignment` (see https://github.com/apache/flink/pull/20485/files#diff-eba14821fb3e96f6f20e3116ceab893c2f18cff605b177dad485aadc43ac4f56R619 line 619). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org