leonardBang commented on code in PR #27134:
URL: https://github.com/apache/flink/pull/27134#discussion_r2471992520


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java:
##########
@@ -81,6 +82,27 @@ public SingleThreadMultiplexSourceReaderBase(
                 context);
     }
 
+    /**
+     * The primary constructor for the source reader.

Review Comment:
   `primary constructor` should only have one, but current we have two. And I 
think you need to update the method java notes.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java:
##########
@@ -435,4 +493,72 @@ private boolean isEndOfStreamReached(T record) {
             return isStreamEnd;
         }
     }
+
+    /**
+     * A wrapper around {@link SourceOutput} that counts the number of records 
emitted.
+     *
+     * <p>This wrapper is used when rate limiting is enabled to track how many 
records have been
+     * emitted since the last rate limit check, allowing the reader to 
properly apply backpressure
+     * when the rate limit is exceeded.
+     *
+     * @param <T> The type of records being emitted
+     */
+    private static final class RecordCountingSourceOutputWrapper<T> implements 
SourceOutput<T> {
+        /** The underlying source output to delegate to. */
+        final SourceOutput<T> sourceOutput;
+
+        /** The number of records emitted since the last reset. */
+        int recordCount;

Review Comment:
    `last reset` is no proper here, window is better, how about
   ```
   /** Count of records handled during the current rate-limiting window. */
   
   int currentWindowRecordCount;
   ``` 
   ?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java:
##########
@@ -108,4 +130,26 @@ public SingleThreadMultiplexSourceReaderBase(
             SourceReaderContext context) {
         super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, 
context);
     }
+
+    /**
+     * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
+     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link

Review Comment:
   we can update this java doc with an end `but accepts a specific  {@link 
RateLimiterStrategy}.`



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java:
##########
@@ -156,14 +177,29 @@ public InputStatus pollNext(ReaderOutput<T> output) 
throws Exception {
 
         // we need to loop here, because we may have to go across splits
         while (true) {
+            // Check if the previous record count reached the limit of 
rateLimiter.
+            if (rateLimitPermissionFuture != null && 
!rateLimitPermissionFuture.isDone()) {

Review Comment:
   In current implementation, we need to check here for **each record `** even 
rateLimitPermissionFuture is null , I believe this will introduce a performance 
regression. We can avoid this regression as we've known that 
`rateLimiterStrategy` nullability in constructor. 



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java:
##########
@@ -435,4 +493,72 @@ private boolean isEndOfStreamReached(T record) {
             return isStreamEnd;
         }
     }
+
+    /**
+     * A wrapper around {@link SourceOutput} that counts the number of records 
emitted.
+     *
+     * <p>This wrapper is used when rate limiting is enabled to track how many 
records have been
+     * emitted since the last rate limit check, allowing the reader to 
properly apply backpressure
+     * when the rate limit is exceeded.
+     *
+     * @param <T> The type of records being emitted
+     */
+    private static final class RecordCountingSourceOutputWrapper<T> implements 
SourceOutput<T> {

Review Comment:
   We can rename RecordCountingSourceOutputWrapper => 
RateLimitingSourceOutputWrapper, which is more Intuitively?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to