hlteoh37 commented on code in PR #190:
URL: 
https://github.com/apache/flink-connector-aws/pull/190#discussion_r1979552202


##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java:
##########
@@ -320,10 +336,103 @@ record ->
         for (int i = 0; i < 10; i++) {
             RecordsWithSplitIds<Record> records = splitReader.fetch();
             fetchedRecords.addAll(readAllRecords(records));
+            Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
         }
         
assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new 
Record[0]));
     }
 
+    @Test
+    void testPollingDelayForEmptyRecords() throws Exception {
+        // Given assigned split with no records
+        testStreamProxy.addShards(TEST_SHARD_ID);
+        splitReader.handleSplitsChanges(
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+        // First poll - should return empty records
+        RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+        assertThat(firstPoll.nextRecordFromSplit()).isNull();
+        assertThat(firstPoll.nextSplit()).isNull();
+        assertThat(firstPoll.finishedSplits()).isEmpty();
+
+        // Immediate second poll - should return empty due to polling delay

Review Comment:
   Can we make sure this test is not flaky by making split delay something like 
1 min?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader
             new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(), 
null, false);
 
     private final StreamProxy dynamodbStreams;
+    private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+    private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
 
-    private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new 
ArrayDeque<>();
+    private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
     private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
-    private final Set<String> pausedSplitIds = new HashSet<>();
+    private final Set<String> pausedSplitIds;
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class);
 
     public PollingDynamoDbStreamsShardSplitReader(
             StreamProxy dynamodbStreamsProxy,
+            Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
+            Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
             Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
         this.dynamodbStreams = dynamodbStreamsProxy;
+        this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
+                getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+        this.getRecordsIdlePollingTimeBetweenEmptyPolls =
+                getRecordsIdlePollingTimeBetweenEmptyPolls;
         this.shardMetricGroupMap = shardMetricGroupMap;
+        this.assignedSplits = new ArrayDeque<>();
+        this.pausedSplitIds = new HashSet<>();
+    }
+
+    private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext 
splitContext) {
+        long requiredDelay =
+                splitContext.wasLastPollEmpty
+                        ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis()
+                        : 
getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis();
+
+        return splitContext.lastPollTimeMillis + requiredDelay;
     }
 
     @Override
     public RecordsWithSplitIds<Record> fetch() throws IOException {
-        DynamoDbStreamsShardSplitState splitState = assignedSplits.poll();
-        if (splitState == null) {
+        if (assignedSplits.isEmpty()) {
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
+        }
+        DynamoDbStreamsShardSplitWithContext splitContext = 
assignedSplits.poll();
+
+        if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
+            assignedSplits.add(splitContext);
             return INCOMPLETE_SHARD_EMPTY_RECORDS;
         }
 
-        if (pausedSplitIds.contains(splitState.getSplitId())) {
-            assignedSplits.add(splitState);
+        // Check if split is paused or not ready due to empty poll delay

Review Comment:
   This comment doesn't seem appropriate (since pause is checked above)



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java:
##########
@@ -320,10 +336,103 @@ record ->
         for (int i = 0; i < 10; i++) {
             RecordsWithSplitIds<Record> records = splitReader.fetch();
             fetchedRecords.addAll(readAllRecords(records));
+            Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
         }
         
assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new 
Record[0]));
     }
 
+    @Test
+    void testPollingDelayForEmptyRecords() throws Exception {
+        // Given assigned split with no records
+        testStreamProxy.addShards(TEST_SHARD_ID);
+        splitReader.handleSplitsChanges(
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+        // First poll - should return empty records
+        RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+        assertThat(firstPoll.nextRecordFromSplit()).isNull();
+        assertThat(firstPoll.nextSplit()).isNull();
+        assertThat(firstPoll.finishedSplits()).isEmpty();
+
+        // Immediate second poll - should return empty due to polling delay
+        RecordsWithSplitIds<Record> secondPoll = splitReader.fetch();
+        assertThat(secondPoll.nextRecordFromSplit()).isNull();
+        assertThat(secondPoll.nextSplit()).isNull();
+        assertThat(secondPoll.finishedSplits()).isEmpty();
+    }
+
+    @Test
+    void testLessPollingDelayForNonEmptyRecords() throws Exception {
+        // Given assigned split with records
+        testStreamProxy.addShards(TEST_SHARD_ID);
+        Record record1 = getTestRecord("data-1");
+        Record record2 = getTestRecord("data-2");
+
+        testStreamProxy.addRecords(
+                TestUtil.STREAM_ARN, TEST_SHARD_ID, 
Collections.singletonList(record1));
+
+        splitReader.handleSplitsChanges(
+                new 
SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+        Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());
+        // First poll - should return record1
+        RecordsWithSplitIds<Record> firstPoll = splitReader.fetch();
+        assertThat(readAllRecords(firstPoll)).containsExactly(record1);
+
+        // Add second record
+        testStreamProxy.addRecords(
+                TestUtil.STREAM_ARN, TEST_SHARD_ID, 
Collections.singletonList(record2));
+        Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis());

Review Comment:
   Can we use Awaitility instead of Thread.sleep?
   https://github.com/awaitility/awaitility



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java:
##########
@@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader
             new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(), 
null, false);
 
     private final StreamProxy dynamodbStreams;
+    private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+    private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
 
-    private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new 
ArrayDeque<>();
+    private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits;
     private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap;
-    private final Set<String> pausedSplitIds = new HashSet<>();
+    private final Set<String> pausedSplitIds;
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class);
 
     public PollingDynamoDbStreamsShardSplitReader(
             StreamProxy dynamodbStreamsProxy,
+            Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
+            Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
             Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) {
         this.dynamodbStreams = dynamodbStreamsProxy;
+        this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
+                getRecordsIdlePollingTimeBetweenNonEmptyPolls;
+        this.getRecordsIdlePollingTimeBetweenEmptyPolls =
+                getRecordsIdlePollingTimeBetweenEmptyPolls;
         this.shardMetricGroupMap = shardMetricGroupMap;
+        this.assignedSplits = new ArrayDeque<>();
+        this.pausedSplitIds = new HashSet<>();
+    }
+
+    private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext 
splitContext) {
+        long requiredDelay =
+                splitContext.wasLastPollEmpty
+                        ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis()
+                        : 
getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis();
+
+        return splitContext.lastPollTimeMillis + requiredDelay;
     }
 
     @Override
     public RecordsWithSplitIds<Record> fetch() throws IOException {
-        DynamoDbStreamsShardSplitState splitState = assignedSplits.poll();
-        if (splitState == null) {
+        if (assignedSplits.isEmpty()) {
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
+        }
+        DynamoDbStreamsShardSplitWithContext splitContext = 
assignedSplits.poll();
+
+        if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) {
+            assignedSplits.add(splitContext);
             return INCOMPLETE_SHARD_EMPTY_RECORDS;
         }
 
-        if (pausedSplitIds.contains(splitState.getSplitId())) {
-            assignedSplits.add(splitState);
+        // Check if split is paused or not ready due to empty poll delay
+        long currentTime = System.currentTimeMillis();
+        long nextEligibleTime = getNextEligibleTime(splitContext);
+
+        LOG.debug(
+                "Polling split: {}, currentTime: {}, eligibleTime: {}, 
wasEmptyPoll: {}",
+                splitContext.splitState.getSplitId(),
+                currentTime,
+                nextEligibleTime,
+                splitContext.wasLastPollEmpty);
+        if (nextEligibleTime > currentTime) {
+            assignedSplits.add(splitContext);
+            sleep(1);

Review Comment:
   Does this actually help with CPU 100%?



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -76,6 +76,21 @@ public enum InitialPosition {
     public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
             "Apache Flink %s (%s) DynamoDb Streams Connector";
 
+    public static final ConfigOption<Duration>
+            DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS =
+                    
ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay")
+                            .durationType()
+                            .defaultValue(Duration.ofMillis(1000))
+                            .withDescription(
+                                    "The idle time between empty polls for 
DynamoDB Streams GetRecords API");
+    public static final ConfigOption<Duration>
+            DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_NON_EMPTY_POLLS =
+                    
ConfigOptions.key("flink.dynamodbstreams.getrecords.nonempty.mindelay")
+                            .durationType()
+                            .defaultValue(Duration.ofMillis(250))

Review Comment:
   Why not make this default to 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to