pnowojski commented on code in PR #20485:
URL: https://github.com/apache/flink/pull/20485#discussion_r963343155


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java:
##########
@@ -132,4 +139,28 @@ default void handleSourceEvents(SourceEvent sourceEvent) {}
      */
     @Override
     default void notifyCheckpointComplete(long checkpointId) throws Exception 
{}
+
+    /**
+     * Pauses or resumes reading of individual source splits.
+     *
+     * <p>Note that no other methods can be called in parallel, so updating 
subscriptions can be
+     * done atomically. This method is simply providing connectors with more 
expressive APIs the
+     * opportunity to update all subscriptions at once.
+     *
+     * <p>This is currently used to align the watermarks of splits, if 
watermark alignment is used
+     * and the source reads from more than one split.
+     *
+     * <p>The default implementation throws an {@link 
UnsupportedOperationException} where the
+     * default implementation will be removed in future releases. To be 
compatible with future
+     * releases, it is recommended to implement this method and override the 
default implementation.
+     *
+     * @param splitsToPause the splits to pause
+     * @param splitsToResume the splits to resume
+     */
+    @PublicEvolving
+    default void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> 
splitsToResume) {
+        throw new UnsupportedOperationException(
+                "This source reader does not support pause or resume splits.");

Review Comment:
   Shouldn't this be kept in sync with the other similar `throw new 
UnsupportedOperationException` in `SplitReader`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java:
##########
@@ -68,4 +69,29 @@
      * @throws Exception if closing the split reader failed.
      */
     void close() throws Exception;
+
+    /**
+     * Pauses or resumes reading of individual splits readers.
+     *
+     * <p>Note that no other methods can be called in parallel, so it's fine 
to non-atomically
+     * update subscriptions. This method is simply providing connectors with 
more expressive APIs
+     * the opportunity to update all subscriptions at once.
+     *
+     * <p>This is currently used to align the watermarks of splits, if 
watermark alignment is used
+     * and the source reads from more than one split.
+     *
+     * <p>The default implementation throws an {@link 
UnsupportedOperationException} where the
+     * default implementation will be removed in future releases. To be 
compatible with future
+     * releases, it is recommended to implement this method and override the 
default implementation.
+     *
+     * @param splitsToPause the splits to pause
+     * @param splitsToResume the splits to resume
+     */
+    default void pauseOrResumeSplits(
+            Collection<SplitT> splitsToPause, Collection<SplitT> 
splitsToResume) {
+        throw new UnsupportedOperationException(
+                "This split reader does not support pause or resume. (To use 
watermark alignment "
+                        + "and allow unaligned source splits set configuration 
parameter "
+                        + 
"'pipeline.watermark-alignment.allow-unaligned-source-splits' to true.)");

Review Comment:
   > This split reader does not support pause or resume. It is highly 
discourage to use
   > watermark alignment and allow unaligned source splits, as this leads to 
unpredictable
   > effectiveness of the watermark alignment. It is recommended to implement 
pausing splits
   > for this source. Temporarily you can allow unaligned source splits by 
setting 
   > configuration parameter 
`pipeline.watermark-alignment.allow-unaligned-source-splits' to true.
   > Beware that this configuration parameter will be dropped in a future Flink 
release.
   
   ?



##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -282,4 +282,21 @@ public enum VertexDescriptionMode {
                     .withDescription(
                             "Whether name of vertex includes topological index 
or not. "
                                     + "When it is true, the name will have a 
prefix of index of the vertex, like '[vertex-0]Source: source'. It is false by 
default");
+
+    @PublicEvolving
+    public static final ConfigOption<Boolean> ALLOW_UNALIGNED_SOURCE_SPLITS =
+            key("pipeline.watermark-alignment.allow-unaligned-source-splits")
+                    .booleanType()
+                    .defaultValue(false)

Review Comment:
   I fully agree. I've left a suggestion in this regard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to