pnowojski commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r958152581
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
+ }
+ }
+
+ /**
+ * Finds the splits that are beyond the current max watermark and pauses
them. At the same time,
+ * splits that have been paused and where the global watermark caught up
are resumed.
+ *
+ * <p>Note: This takes effect only if there are multiple splits, otherwise
it does nothing.
+ */
+ private void checkSplitWatermarkAlignment() {
+ if (numSplits <= 1) {
+ return; // If there is only a single split, we do not pause the
split but the source.
+ }
Review Comment:
Would you be willing to simplify this? If so, I would suggest to do it as a
separate commit (either in the same PR or a new PR).
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
+ }
+ }
+
+ /**
+ * Finds the splits that are beyond the current max watermark and pauses
them. At the same time,
+ * splits that have been paused and where the global watermark caught up
are resumed.
+ *
+ * <p>Note: This takes effect only if there are multiple splits, otherwise
it does nothing.
+ */
+ private void checkSplitWatermarkAlignment() {
+ if (numSplits <= 1) {
+ return; // If there is only a single split, we do not pause the
split but the source.
+ }
+ Collection<String> splitsToPause = new ArrayList<>();
+ Collection<String> splitsToResume = new ArrayList<>();
+ splitCurrentWatermarks.forEach(
+ (splitId, splitWatermark) -> {
+ if (splitWatermark > currentMaxDesiredWatermark) {
+ splitsToPause.add(splitId);
+ } else if (currentlyPausedSplits.contains(splitId)) {
+ splitsToResume.add(splitId);
+ }
+ });
+ splitsToPause.removeAll(currentlyPausedSplits);
+ if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+ pauseOrResumeSplits(splitsToPause, splitsToResume);
+ currentlyPausedSplits.addAll(splitsToPause);
+ splitsToResume.forEach(currentlyPausedSplits::remove);
+ }
+ }
+
+ private void pauseOrResumeSplits(
+ Collection<String> splitsToPause, Collection<String>
splitsToResume) {
+ if (!allowUnalignedSourceSplits ||
sourceReaderSupportsPauseOrResumeSplits) {
+ try {
+ sourceReader.pauseOrResumeSplits(splitsToPause,
splitsToResume);
+ } catch (UnsupportedOperationException e) {
+ sourceReaderSupportsPauseOrResumeSplits = false;
+ if (!allowUnalignedSourceSplits) {
+ throw new UnsupportedOperationException("", e);
Review Comment:
> In my understanding, readers which do not support pausing splits will fail
the job, unless the configuration to allow unaligned splits is set. Hence, I
don't understand the sourceReaderSupportsPauseOrResumeSplits flag.
please check my comment above.
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java:
##########
@@ -227,6 +227,20 @@ public void close() throws Exception {
consumer.close();
}
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<KafkaPartitionSplit> splitsToPause,
+ Collection<KafkaPartitionSplit> splitsToResume) {
+ consumer.resume(
+ splitsToResume.stream()
+ .map(KafkaPartitionSplit::getTopicPartition)
+ .collect(Collectors.toList()));
+ consumer.pause(
+ splitsToPause.stream()
+ .map(KafkaPartitionSplit::getTopicPartition)
+ .collect(Collectors.toList()));
Review Comment:
I'm not sure how likely is this `resume` to ever block and even if it
blocks, I don't think it should cause any problems. Keep in mind that this is
not an exact feature. We do not guarantee that no records will be processed .
The whole watermark alignment requires sending RPCs between TMs and JM so it
has built in delay/lag.
At the same time I don't see how doing this in the other way around would
cause much harm. At worst the consumer would have a for a very short period of
time no splits to process if everything is paused but not yet resumed.
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java:
##########
@@ -179,6 +180,21 @@ public void
handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
this.pulsarConsumer = consumer;
}
+ @Override
+ public void pauseOrResumeSplits(
+ Collection<PulsarPartitionSplit> splitsToPause,
+ Collection<PulsarPartitionSplit> splitsToResume) {
+ if (splitsToPause.size() > 1 || splitsToResume.size() > 1) {
+ throw new IllegalStateException("This pulsar split reader only
support one split.");
Review Comment:
Good question...
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
Review Comment:
? This line that you are pointing to
```
plitsToPause.removeAll(currentlyPausedSplits);
```
doesn't modify `currentlyPausedSplits`. It only gathers the information what
splits have to be paused/resumed.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java:
##########
@@ -171,17 +230,54 @@ void runOnce() {
* @param splitsToAdd the splits to add.
*/
public void addSplits(List<SplitT> splitsToAdd) {
- enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd,
assignedSplits));
- wakeUp(true);
+ lock.lock();
+ try {
+ enqueueTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd,
assignedSplits));
+ wakeUpUnsafe(true);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Called when some splits of this source instance progressed too much
beyond the global
+ * watermark of all subtasks. If the split reader implements {@link
SplitReader}, it will relay
+ * the information asynchronously through the split fetcher thread.
+ *
+ * @param splitsToPause the splits to pause
+ * @param splitsToResume the splits to resume
+ */
+ public void pauseOrResumeSplits(
+ Collection<SplitT> splitsToPause, Collection<SplitT>
splitsToResume) {
+ lock.lock();
+ try {
+ enqueueTaskUnsafe(
+ new PauseOrResumeSplitsTask<>(
+ splitReader,
+ splitsToPause,
+ splitsToResume,
+ allowUnalignedSourceSplits));
+ wakeUpUnsafe(true);
+ } finally {
+ lock.unlock();
+ }
}
public void enqueueTask(SplitFetcherTask task) {
- synchronized (lock) {
- taskQueue.offer(task);
- isIdle = false;
+ lock.lock();
+ try {
+ enqueueTaskUnsafe(task);
+ } finally {
+ lock.unlock();
}
}
+ private void enqueueTaskUnsafe(SplitFetcherTask task) {
+ assert lock.isHeldByCurrentThread();
Review Comment:
More or less this is our convention (at least in the runtime) to use those
`Unsafe` suffixed methods. As far as I remember double acquisition of the same
lock is extremely expensive in JVM (10x slower?).
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -559,14 +577,67 @@ private void createOutputForSplits(List<SplitT>
newSplits) {
private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
currentMaxDesiredWatermark = event.getMaxWatermark();
+ checkSplitWatermarkAlignment();
sourceMetricGroup.updateMaxDesiredWatermark(currentMaxDesiredWatermark);
}
- private void onWatermarkEmitted(long emittedWatermark) {
- lastEmittedWatermark = emittedWatermark;
+ @Override
+ public void updateCurrentEffectiveWatermark(long watermark) {
+ lastEmittedWatermark = watermark;
checkWatermarkAlignment();
}
+ @Override
+ public void updateCurrentSplitWatermark(String splitId, long watermark) {
+ splitCurrentWatermarks.put(splitId, watermark);
+ if (currentMaxDesiredWatermark < watermark &&
!currentlyPausedSplits.contains(splitId)) {
+ pauseOrResumeSplits(Collections.singletonList(splitId),
Collections.emptyList());
+ currentlyPausedSplits.add(splitId);
+ }
+ }
+
+ /**
+ * Finds the splits that are beyond the current max watermark and pauses
them. At the same time,
+ * splits that have been paused and where the global watermark caught up
are resumed.
+ *
+ * <p>Note: This takes effect only if there are multiple splits, otherwise
it does nothing.
+ */
+ private void checkSplitWatermarkAlignment() {
+ if (numSplits <= 1) {
+ return; // If there is only a single split, we do not pause the
split but the source.
+ }
+ Collection<String> splitsToPause = new ArrayList<>();
+ Collection<String> splitsToResume = new ArrayList<>();
+ splitCurrentWatermarks.forEach(
+ (splitId, splitWatermark) -> {
+ if (splitWatermark > currentMaxDesiredWatermark) {
+ splitsToPause.add(splitId);
+ } else if (currentlyPausedSplits.contains(splitId)) {
+ splitsToResume.add(splitId);
+ }
+ });
+ splitsToPause.removeAll(currentlyPausedSplits);
+ if (!splitsToPause.isEmpty() || !splitsToResume.isEmpty()) {
+ pauseOrResumeSplits(splitsToPause, splitsToResume);
+ currentlyPausedSplits.addAll(splitsToPause);
+ splitsToResume.forEach(currentlyPausedSplits::remove);
+ }
Review Comment:
What do you mean "signal to user"?
##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
.withDescription(
"Whether name of vertex includes topological index
or not. "
+ "When it is true, the name will have a
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by
default");
+
+ @PublicEvolving
+ public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
+ key("pipeline.watermark-alignment.allow-unaligned-source-splits")
+ .booleanType()
+ .defaultValue(false)
Review Comment:
Please check the dev mailing list discussion. We want to change the the old
behaviour from FLINK-1.14/1.15. In a couple of releases (1.18? 1.19?), we will
want to disallow partially working watermark alignment when source doesn't
support per split alignment. At the same time we want to provide users time to
adjust/migrate if they are relaying on this partially working feature for now.
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java:
##########
@@ -142,6 +143,30 @@ public void accept(Throwable t) {
public abstract void addSplits(List<SplitT> splitsToAdd);
+ public void alignSplitsWatermarks(
Review Comment:
nitty nit: this should be called `pauseOrResumeSplits` in the first commit
that it appears.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]