mxm commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r952615289


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.mocks.MockSourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
+import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for split alignment in {@link SourceOperator}. */
+public class SourceOperatorSplitWatermarkAlignmentTest {
+    public static final WatermarkGenerator<Integer> WATERMARK_GENERATOR =
+            new WatermarkGenerator<Integer>() {
+
+                private long maxWatermark = Long.MIN_VALUE;
+
+                @Override
+                public void onEvent(Integer event, long eventTimestamp, 
WatermarkOutput output) {
+                    if (eventTimestamp > maxWatermark) {
+                        this.maxWatermark = eventTimestamp;
+                        output.emitWatermark(new Watermark(maxWatermark));
+                    }
+                }
+
+                @Override
+                public void onPeriodicEmit(WatermarkOutput output) {
+                    output.emitWatermark(new Watermark(maxWatermark));
+                }
+            };
+
+    @Test
+    public void testSplitWatermarkAlignment() throws Exception {
+
+        final SplitAligningSourceReader sourceReader = new 
SplitAligningSourceReader();
+        SourceOperator<Integer, MockSourceSplit> operator =
+                new TestingSourceOperator<>(
+                        sourceReader,
+                        WatermarkStrategy.forGenerator(ctx -> 
WATERMARK_GENERATOR)
+                                .withTimestampAssigner((r, l) -> r)
+                                .withWatermarkAlignment("group-1", 
Duration.ofMillis(1)),
+                        new TestProcessingTimeService(),
+                        new MockOperatorEventGateway(),
+                        1,
+                        5,
+                        true);
+        Environment env = getTestingEnvironment();
+        operator.setup(
+                new SourceOperatorStreamTask<Integer>(env),
+                new MockStreamConfig(new Configuration(), 1),
+                new MockOutput<>(new ArrayList<>()));
+        operator.initializeState(new StreamTaskStateInitializerImpl(env, new 
MemoryStateBackend()));
+
+        operator.open();
+        final MockSourceSplit split1 = new MockSourceSplit(0, 0, 10);
+        final MockSourceSplit split2 = new MockSourceSplit(1, 10, 20);
+        split1.addRecord(5);
+        split1.addRecord(6);
+        split1.addRecord(11);
+        operator.handleOperatorEvent(
+                new AddSplitEvent<>(
+                        Arrays.asList(split1, split2), new 
MockSourceSplitSerializer()));
+        final CollectingDataOutput<Integer> dataOutput = new 
CollectingDataOutput<>();
+
+        operator.emitNext(dataOutput); // 5
+        operator.handleOperatorEvent(
+                new WatermarkAlignmentEvent(4)); // pause by coordinator 
message
+        assertThat(sourceReader.pausedSplits).containsExactly("0");
+        operator.handleOperatorEvent(new WatermarkAlignmentEvent(5));
+        assertThat(sourceReader.pausedSplits).isEmpty();
+        operator.emitNext(dataOutput); // 6, pause by increasing watermark
+        assertThat(sourceReader.pausedSplits).containsExactly("0");
+    }

Review Comment:
   Should we test resuming as well?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Changes the paused splits of a n{@link SplitReader}. The task is used by 
default in {@link
+ * SplitFetcherManager} and assumes that a {@link SplitFetcher} has multiple 
splits. For {@code
+ * SplitFetchers} with single splits, it's instead recommended to subclass 
{@link
+ * SplitFetcherManager} and pause the whole {@code SplitFetcher}.
+ *
+ * @param <SplitT> the type of the split
+ */
+@Internal
+class PauseOrResumeSplitsTask<SplitT extends SourceSplit> implements 
SplitFetcherTask {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PauseOrResumeSplitsTask.class);
+    private final SplitReader<?, SplitT> splitReader;
+    private final Collection<SplitT> splitsToPause;
+    private final Collection<SplitT> splitsToResume;
+    private final boolean allowUnalignedSourceSplits;
+
+    PauseOrResumeSplitsTask(
+            SplitReader<?, SplitT> splitReader,
+            Collection<SplitT> splitsToPause,
+            Collection<SplitT> splitsToResume,
+            boolean allowUnalignedSourceSplits) {
+        this.splitReader = checkNotNull(splitReader);
+        this.splitsToPause = checkNotNull(splitsToPause);
+        this.splitsToResume = checkNotNull(splitsToResume);
+        this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
+    }
+
+    @Override
+    public boolean run() throws IOException {
+        try {
+            splitReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
+        } catch (UnsupportedOperationException e) {
+            if (!allowUnalignedSourceSplits) {
+                throw new UnsupportedOperationException("", e);

Review Comment:
   +1 



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -227,6 +227,20 @@ public void close() throws Exception {
         consumer.close();
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<KafkaPartitionSplit> splitsToPause,
+            Collection<KafkaPartitionSplit> splitsToResume) {
+        consumer.resume(
+                splitsToResume.stream()
+                        .map(KafkaPartitionSplit::getTopicPartition)
+                        .collect(Collectors.toList()));
+        consumer.pause(
+                splitsToPause.stream()
+                        .map(KafkaPartitionSplit::getTopicPartition)
+                        .collect(Collectors.toList()));

Review Comment:
   Should the order be the other way around, in case resuming blocks for any 
reason?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -171,17 +230,54 @@ void runOnce() {
      * @param splitsToAdd the splits to add.
      */
     public void addSplits(List<SplitT> splitsToAdd) {
-        enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
-        wakeUp(true);
+        lock.lock();
+        try {
+            enqueueTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
+            wakeUpUnsafe(true);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Called when some splits of this source instance progressed too much 
beyond the global
+     * watermark of all subtasks. If the split reader implements {@link 
SplitReader}, it will relay
+     * the information asynchronously through the split fetcher thread.
+     *
+     * @param splitsToPause the splits to pause
+     * @param splitsToResume the splits to resume
+     */
+    public void pauseOrResumeSplits(
+            Collection<SplitT> splitsToPause, Collection<SplitT> 
splitsToResume) {
+        lock.lock();
+        try {
+            enqueueTaskUnsafe(
+                    new PauseOrResumeSplitsTask<>(
+                            splitReader,
+                            splitsToPause,
+                            splitsToResume,
+                            allowUnalignedSourceSplits));
+            wakeUpUnsafe(true);
+        } finally {
+            lock.unlock();
+        }
     }
 
     public void enqueueTask(SplitFetcherTask task) {
-        synchronized (lock) {
-            taskQueue.offer(task);
-            isIdle = false;
+        lock.lock();
+        try {
+            enqueueTaskUnsafe(task);
+        } finally {
+            lock.unlock();
         }
     }
 
+    private void enqueueTaskUnsafe(SplitFetcherTask task) {
+        assert lock.isHeldByCurrentThread();

Review Comment:
   Since this is an ReentrantLock, we could also lock here which would make 
this method safer. The assert won't run in production.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }
+    }
+
+    /**
+     * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
+     * splits that have been paused and where the global watermark caught up 
are resumed.
+     *
+     * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
+     */
+    private void checkSplitWatermarkAlignment() {
+        if (numSplits <= 1) {
+            return; // If there is only a single split, we do not pause the 
split but the source.
+        }
+        Collection<String> splitsToPause = new ArrayList<>();
+        Collection<String> splitsToResume = new ArrayList<>();
+        splitCurrentWatermarks.forEach(
+                (splitId, splitWatermark) -> {
+                    if (splitWatermark > currentMaxDesiredWatermark) {
+                        splitsToPause.add(splitId);
+                    } else if (currentlyPausedSplits.contains(splitId)) {
+                        splitsToResume.add(splitId);
+                    }
+                });
+        splitsToPause.removeAll(currentlyPausedSplits);
+        if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+            pauseOrResumeSplits(splitsToPause, splitsToResume);
+            currentlyPausedSplits.addAll(splitsToPause);
+            splitsToResume.forEach(currentlyPausedSplits::remove);
+        }
+    }
+
+    private void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        if (!allowUnalignedSourceSplits || 
sourceReaderSupportsPauseOrResumeSplits) {
+            try {
+                sourceReader.pauseOrResumeSplits(splitsToPause, 
splitsToResume);
+            } catch (UnsupportedOperationException e) {
+                sourceReaderSupportsPauseOrResumeSplits = false;
+                if (!allowUnalignedSourceSplits) {
+                    throw new UnsupportedOperationException("", e);

Review Comment:
   +1



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java:
##########
@@ -179,6 +180,21 @@ public void 
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
         this.pulsarConsumer = consumer;
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<PulsarPartitionSplit> splitsToPause,
+            Collection<PulsarPartitionSplit> splitsToResume) {
+        if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
+            throw new IllegalStateException("This pulsar split reader only 
support one split.");

Review Comment:
   I don't understand why we can't pause single splits. Even if so, I don't 
know why that should leak to the Pulsar reader.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -210,138 +313,73 @@ Map<String, SplitT> assignedSplits() {
     /**
      * Package private for unit test.
      *
-     * @return true if task queue is not empty, false otherwise.
+     * @return true if task queue is empty, false otherwise.
      */
     boolean isIdle() {
-        return isIdle;
-    }
-
-    /**
-     * Check whether the fetch task should run. The fetch task should only run 
when all the
-     * following conditions are met. 1. there is no task in the task queue to 
run. 2. there are
-     * assigned splits Package private for testing purpose.
-     *
-     * @return whether the fetch task should be run.
-     */
-    boolean shouldRunFetchTask() {
-        return taskQueue.isEmpty() && !assignedSplits.isEmpty();
+        lock.lock();
+        try {
+            return assignedSplits.isEmpty() && taskQueue.isEmpty() && 
runningTask == null;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
      * Wake up the fetcher thread. There are only two blocking points in a 
running fetcher. 1.
-     * Taking the next task out of the task queue. 2. Running a task.
+     * Waiting for the next task in an idle fetcher. 2. Running a task.
      *
      * <p>They need to be waken up differently. If the fetcher is blocking 
waiting on the next task
-     * in the task queue, we should just interrupt the fetcher thread. If the 
fetcher is running the
-     * user split reader, we should call SplitReader.wakeUp() instead of 
naively interrupt the
-     * thread.
-     *
-     * <p>The correctness can be think of in the following way. The purpose of 
wake up is to let the
-     * fetcher thread go to the very beginning of the running loop. There are 
three major events in
-     * each run of the loop.
-     *
-     * <ol>
-     *   <li>pick a task (blocking)
-     *   <li>assign the task to runningTask variable.
-     *   <li>run the runningTask. (blocking)
-     * </ol>
-     *
-     * <p>We don't need to worry about things after step 3 because there is no 
blocking point
-     * anymore.
-     *
-     * <p>We always first set the wakeup flag when waking up the fetcher, then 
use the value of
-     * running task to determine where the fetcher thread is.
-     *
-     * <ul>
-     *   <li>If runningThread is null, it is before step 2, so we should 
interrupt fetcher. This
-     *       interruption will not be propagated to the split reader, because 
the wakeUp flag will
-     *       prevent the fetchTask from running.
-     *   <li>If runningThread is not null, it is after step 2. so we should 
wakeUp the split reader
-     *       instead of interrupt the fetcher.
-     * </ul>
+     * in the task queue, we should just notify that a task is available. If 
the fetcher is running
+     * the user split reader, we should call SplitReader.wakeUp() instead.
      *
-     * <p>The above logic only works in the same {@link #runOnce()} 
invocation. So we need to
-     * synchronize to ensure the wake up logic do not touch a different 
invocation.
+     * <p>The correctness can be thought of in the following way. The purpose 
of wake up is to let
+     * the fetcher thread go to the very beginning of the running loop.
      */
     void wakeUp(boolean taskOnly) {
         // Synchronize to make sure the wake up only works for the current 
invocation of runOnce().
-        synchronized (wakeUp) {
-            // Do not wake up repeatedly.
-            wakeUp.set(true);
-            // Now the wakeUp flag is set.
-            SplitFetcherTask currentTask = runningTask;
-            if (isRunningTask(currentTask)) {
-                // The running task may have missed our wakeUp flag and 
running, wake it up.
-                LOG.debug("Waking up running task {}", currentTask);
-                currentTask.wakeUp();
-            } else if (!taskOnly) {
-                // The task has not started running yet, and it will not run 
for this
-                // runOnce() invocation due to the wakeUp flag. But we might 
have to
-                // wake up the fetcher thread in case it is blocking on the 
task queue.
-                // Only wake up when the thread has started and there is no 
running task.
-                LOG.debug("Waking up fetcher thread.");
-                taskQueue.add(WAKEUP_TASK);
-            }
+        lock.lock();
+        try {
+            wakeUpUnsafe(taskOnly);
+        } finally {
+            lock.unlock();
         }
     }
 
-    private void maybeEnqueueTask(SplitFetcherTask task) {
-        // Only enqueue unfinished non-fetch task.
-        if (!closed.get()
-                && isRunningTask(task)
-                && task != fetchTask
-                && !taskQueue.offerFirst(task)) {
-            throw new RuntimeException(
-                    "The task queue is full. This is only theoretically 
possible when really bad thing happens.");
-        }
-        if (task != null) {
-            LOG.debug("Enqueued task {}", task);
+    private void wakeUpUnsafe(boolean taskOnly) {
+        assert lock.isHeldByCurrentThread();

Review Comment:
   Same here, locking would be an option because of ReentrantLock.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }
+    }
+
+    /**
+     * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
+     * splits that have been paused and where the global watermark caught up 
are resumed.
+     *
+     * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
+     */
+    private void checkSplitWatermarkAlignment() {
+        if (numSplits <= 1) {
+            return; // If there is only a single split, we do not pause the 
split but the source.
+        }
+        Collection<String> splitsToPause = new ArrayList<>();
+        Collection<String> splitsToResume = new ArrayList<>();
+        splitCurrentWatermarks.forEach(
+                (splitId, splitWatermark) -> {
+                    if (splitWatermark > currentMaxDesiredWatermark) {
+                        splitsToPause.add(splitId);
+                    } else if (currentlyPausedSplits.contains(splitId)) {
+                        splitsToResume.add(splitId);
+                    }
+                });
+        splitsToPause.removeAll(currentlyPausedSplits);
+        if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+            pauseOrResumeSplits(splitsToPause, splitsToResume);
+            currentlyPausedSplits.addAll(splitsToPause);
+            splitsToResume.forEach(currentlyPausedSplits::remove);
+        }
+    }
+
+    private void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        if (!allowUnalignedSourceSplits || 
sourceReaderSupportsPauseOrResumeSplits) {
+            try {
+                sourceReader.pauseOrResumeSplits(splitsToPause, 
splitsToResume);
+            } catch (UnsupportedOperationException e) {
+                sourceReaderSupportsPauseOrResumeSplits = false;
+                if (!allowUnalignedSourceSplits) {
+                    throw new UnsupportedOperationException("", e);

Review Comment:
   In my understanding, readers which do not support pausing splits will fail 
the job, unless the configuration to allow unaligned splits is set. Hence, I 
don't understand the `sourceReaderSupportsPauseOrResumeSplits` flag.
   
   I suggest the following:
   
   ```suggestion
           try {
               sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
           } catch (UnsupportedOperationException e) {
               sourceReaderSupportsPauseOrResumeSplits = false;
               if (!allowUnalignedSourceSplits) {
                   throw e;
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }
+    }
+
+    /**
+     * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
+     * splits that have been paused and where the global watermark caught up 
are resumed.
+     *
+     * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
+     */
+    private void checkSplitWatermarkAlignment() {
+        if (numSplits <= 1) {
+            return; // If there is only a single split, we do not pause the 
split but the source.
+        }

Review Comment:
   It would be great if we didn't have to special case a single split. I 
understand that the `SourceReaderBase` will stop polling in this case, but it 
would be beneficial if there was only one path.
   



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -111,57 +121,106 @@ public void run() {
                 splitReader.close();
             } catch (Exception e) {
                 errorHandler.accept(e);
+            } finally {
+                LOG.info("Split fetcher {} exited.", id);
+                // This executes after possible errorHandler.accept(t). If 
these operations bear
+                // a happens-before relation, then we can checking side effect 
of
+                // errorHandler.accept(t)
+                // to know whether it happened after observing side effect of 
shutdownHook.run().
+                shutdownHook.run();
             }
-            LOG.info("Split fetcher {} exited.", id);
-            // This executes after possible errorHandler.accept(t). If these 
operations bear
-            // a happens-before relation, then we can checking side effect of 
errorHandler.accept(t)
-            // to know whether it happened after observing side effect of 
shutdownHook.run().
-            shutdownHook.run();
         }
     }
 
     /** Package private method to help unit test. */
-    void runOnce() {
+    boolean runOnce() {
+        // first blocking call = get next task. blocks only if there are no 
active splits and queued
+        // tasks.
+        SplitFetcherTask task;
+        lock.lock();
         try {
-            // The fetch task should run if the split assignment is not empty 
or there is a split
-            // change.
-            if (shouldRunFetchTask()) {
-                runningTask = fetchTask;
-            } else {
-                runningTask = taskQueue.take();
+            if (closed) {
+                return false;
             }
-            // Now the running task is not null. If wakeUp() is called after 
this point,
-            // task.wakeUp() will be called. On the other hand, if the 
wakeUp() call was make before
-            // this point, the wakeUp flag must have already been set. The 
code hence checks the
-            // wakeUp
-            // flag first to avoid an unnecessary task run.
-            // Note that the runningTask may still encounter the case that the 
task is waken up
-            // before
-            // the it starts running.
-            LOG.debug("Prepare to run {}", runningTask);
-            if (!wakeUp.get() && runningTask.run()) {
-                LOG.debug("Finished running task {}", runningTask);
-                // the task has finished running. Set it to null so it won't 
be enqueued.
-                runningTask = null;
-                checkAndSetIdle();
+
+            task = getNextTaskUnsafe();
+            if (task == null) {
+                // (spurious) wakeup, so just repeat
+                return true;
             }
+
+            LOG.debug("Prepare to run {}", task);
+            // store task for #wakeUp
+            this.runningTask = task;
+        } finally {
+            lock.unlock();
+        }
+
+        // execute the task outside of lock, so that it can be woken up
+        boolean taskFinished;
+        try {
+            taskFinished = task.run();
         } catch (Exception e) {
             throw new RuntimeException(
                     String.format(
                             "SplitFetcher thread %d received unexpected 
exception while polling the records",
                             id),
                     e);
         }
-        // If the task is not null that means this task needs to be 
re-executed. This only
-        // happens when the task is the fetching task or the task was 
interrupted.
-        maybeEnqueueTask(runningTask);
-        synchronized (wakeUp) {
-            // Set the running task to null. It is necessary for the shutdown 
method to avoid
-            // unnecessarily interrupt the running task.
-            runningTask = null;
-            // Set the wakeUp flag to false.
-            wakeUp.set(false);
-            LOG.debug("Cleaned wakeup flag.");
+
+        // re-acquire lock as all post-processing steps, need it
+        lock.lock();
+        try {
+            this.runningTask = null;
+            processTaskResultUnsafe(task, taskFinished);
+        } finally {
+            lock.unlock();
+        }
+        return true;
+    }
+
+    private void processTaskResultUnsafe(SplitFetcherTask task, boolean 
taskFinished) {
+        assert lock.isHeldByCurrentThread();

Review Comment:
   Same as for the other lock asserts.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }
+    }
+
+    /**
+     * Finds the splits that are beyond the current max watermark and pauses 
them. At the same time,
+     * splits that have been paused and where the global watermark caught up 
are resumed.
+     *
+     * <p>Note: This takes effect only if there are multiple splits, otherwise 
it does nothing.
+     */
+    private void checkSplitWatermarkAlignment() {
+        if (numSplits <= 1) {
+            return; // If there is only a single split, we do not pause the 
split but the source.
+        }
+        Collection<String> splitsToPause = new ArrayList<>();
+        Collection<String> splitsToResume = new ArrayList<>();
+        splitCurrentWatermarks.forEach(
+                (splitId, splitWatermark) -> {
+                    if (splitWatermark > currentMaxDesiredWatermark) {
+                        splitsToPause.add(splitId);
+                    } else if (currentlyPausedSplits.contains(splitId)) {
+                        splitsToResume.add(splitId);
+                    }
+                });
+        splitsToPause.removeAll(currentlyPausedSplits);
+        if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+            pauseOrResumeSplits(splitsToPause, splitsToResume);
+            currentlyPausedSplits.addAll(splitsToPause);
+            splitsToResume.forEach(currentlyPausedSplits::remove);
+        }

Review Comment:
   What if all splits are paused, do we signal this back to the user like when 
we only have a single split?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -210,138 +313,73 @@ Map<String, SplitT> assignedSplits() {
     /**
      * Package private for unit test.
      *
-     * @return true if task queue is not empty, false otherwise.
+     * @return true if task queue is empty, false otherwise.
      */
     boolean isIdle() {
-        return isIdle;
-    }
-
-    /**
-     * Check whether the fetch task should run. The fetch task should only run 
when all the
-     * following conditions are met. 1. there is no task in the task queue to 
run. 2. there are
-     * assigned splits Package private for testing purpose.
-     *
-     * @return whether the fetch task should be run.
-     */
-    boolean shouldRunFetchTask() {
-        return taskQueue.isEmpty() && !assignedSplits.isEmpty();
+        lock.lock();
+        try {
+            return assignedSplits.isEmpty() && taskQueue.isEmpty() && 
runningTask == null;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
      * Wake up the fetcher thread. There are only two blocking points in a 
running fetcher. 1.
-     * Taking the next task out of the task queue. 2. Running a task.
+     * Waiting for the next task in an idle fetcher. 2. Running a task.
      *
      * <p>They need to be waken up differently. If the fetcher is blocking 
waiting on the next task
-     * in the task queue, we should just interrupt the fetcher thread. If the 
fetcher is running the
-     * user split reader, we should call SplitReader.wakeUp() instead of 
naively interrupt the
-     * thread.
-     *
-     * <p>The correctness can be think of in the following way. The purpose of 
wake up is to let the
-     * fetcher thread go to the very beginning of the running loop. There are 
three major events in
-     * each run of the loop.
-     *
-     * <ol>
-     *   <li>pick a task (blocking)
-     *   <li>assign the task to runningTask variable.
-     *   <li>run the runningTask. (blocking)
-     * </ol>
-     *
-     * <p>We don't need to worry about things after step 3 because there is no 
blocking point
-     * anymore.
-     *
-     * <p>We always first set the wakeup flag when waking up the fetcher, then 
use the value of
-     * running task to determine where the fetcher thread is.
-     *
-     * <ul>
-     *   <li>If runningThread is null, it is before step 2, so we should 
interrupt fetcher. This
-     *       interruption will not be propagated to the split reader, because 
the wakeUp flag will
-     *       prevent the fetchTask from running.
-     *   <li>If runningThread is not null, it is after step 2. so we should 
wakeUp the split reader
-     *       instead of interrupt the fetcher.

Review Comment:
   Do we want to explain how the new logic works?



##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
                     .withDescription(
                             "Whether name of vertex includes topological index 
or not. "
                                     + "When it is true, the name will have a 
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by 
default");
+
+    @PublicEvolving
+    public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
+            key("pipeline.watermark-alignment.allow-unaligned-source-splits")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   Just a thought: Do we have to have another configuration option? I think we 
could just keep the default behavior prior adding this feature and not do split 
alignment. We could print a WARN when this happens.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);
+        }

Review Comment:
   We should consider whether we need the eager pausing logic here. There is 
the periodic `checkSplitWatermarkAlignment` method which achieves the same. The 
default with one split was to pause at the alignment interval only, so maybe we 
should keep that behavior.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/PauseOrResumeSplitsTask.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Changes the paused splits of a n{@link SplitReader}. The task is used by 
default in {@link

Review Comment:
   "a n" typo?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -175,6 +188,9 @@ private enum OperatingMode {
 
     private @Nullable LatencyMarkerEmitter<OUT> latencyMarkerEmitter;
 
+    private final boolean allowUnalignedSourceSplits;
+    private boolean sourceReaderSupportsPauseOrResumeSplits = true;

Review Comment:
   IMHO this flag is not needed.
   ```suggestion
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -171,17 +230,54 @@ void runOnce() {
      * @param splitsToAdd the splits to add.
      */
     public void addSplits(List<SplitT> splitsToAdd) {
-        enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
-        wakeUp(true);
+        lock.lock();
+        try {
+            enqueueTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
+            wakeUpUnsafe(true);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Called when some splits of this source instance progressed too much 
beyond the global
+     * watermark of all subtasks. If the split reader implements {@link 
SplitReader}, it will relay
+     * the information asynchronously through the split fetcher thread.

Review Comment:
   This JavaDoc leaks information about potential callers but does not explain 
what this method actually does.



##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java:
##########
@@ -68,7 +69,7 @@ public KafkaSourceFetcherManager(
             Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, 
KafkaPartitionSplit>>
                     splitReaderSupplier,
             Consumer<Collection<String>> splitFinishedHook) {
-        super(elementsQueue, splitReaderSupplier, splitFinishedHook);
+        super(elementsQueue, splitReaderSupplier, new Configuration(), 
splitFinishedHook);

Review Comment:
   Why do we need pass in the correct configuration?



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java:
##########
@@ -179,6 +180,21 @@ public void 
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
         this.pulsarConsumer = consumer;
     }
 
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<PulsarPartitionSplit> splitsToPause,
+            Collection<PulsarPartitionSplit> splitsToResume) {
+        if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
+            throw new IllegalStateException("This pulsar split reader only 
support one split.");

Review Comment:
   ```suggestion
               throw new IllegalStateException("This pulsar split reader only 
supports one split.");
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT> 
newSplits) {
 
     private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
         currentMaxDesiredWatermark = event.getMaxWatermark();
+        checkSplitWatermarkAlignment();
         
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
     }
 
-    private void onWatermarkEmitted(long emittedWatermark) {
-        lastEmittedWatermark = emittedWatermark;
+    @Override
+    public void updateCurrentEffectiveWatermark(long watermark) {
+        lastEmittedWatermark = watermark;
         checkWatermarkAlignment();
     }
 
+    @Override
+    public void updateCurrentSplitWatermark(String splitId, long watermark) {
+        splitCurrentWatermarks.put(splitId, watermark);
+        if (currentMaxDesiredWatermark < watermark && 
!currentlyPausedSplits.contains(splitId)) {
+            pauseOrResumeSplits(Collections.singletonList(splitId), 
Collections.emptyList());
+            currentlyPausedSplits.add(splitId);

Review Comment:
   Is this operation safe? We are adding new splits here while at the same time 
this set is cleared in ` checkSplitWatermarkAlignment` (see 
https://github.com/apache/flink/pull/20485/files#diff-eba14821fb3e96f6f20e3116ceab893c2f18cff605b177dad485aadc43ac4f56R619
 line 619). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to