hlteoh37 commented on code in PR #190: URL: https://github.com/apache/flink-connector-aws/pull/190#discussion_r1979552202
########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java: ########## @@ -320,10 +336,103 @@ record -> for (int i = 0; i < 10; i++) { RecordsWithSplitIds<Record> records = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(records)); + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); } assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new Record[0])); } + @Test + void testPollingDelayForEmptyRecords() throws Exception { + // Given assigned split with no records + testStreamProxy.addShards(TEST_SHARD_ID); + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + // First poll - should return empty records + RecordsWithSplitIds<Record> firstPoll = splitReader.fetch(); + assertThat(firstPoll.nextRecordFromSplit()).isNull(); + assertThat(firstPoll.nextSplit()).isNull(); + assertThat(firstPoll.finishedSplits()).isEmpty(); + + // Immediate second poll - should return empty due to polling delay Review Comment: Can we make sure this test is not flaky by making split delay something like 1 min? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java: ########## @@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(), null, false); private final StreamProxy dynamodbStreams; + private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls; + private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls; - private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new ArrayDeque<>(); + private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits; private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap; - private final Set<String> pausedSplitIds = new HashSet<>(); + private final Set<String> pausedSplitIds; + private static final Logger LOG = + LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class); public PollingDynamoDbStreamsShardSplitReader( StreamProxy dynamodbStreamsProxy, + Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls, + Duration getRecordsIdlePollingTimeBetweenEmptyPolls, Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) { this.dynamodbStreams = dynamodbStreamsProxy; + this.getRecordsIdlePollingTimeBetweenNonEmptyPolls = + getRecordsIdlePollingTimeBetweenNonEmptyPolls; + this.getRecordsIdlePollingTimeBetweenEmptyPolls = + getRecordsIdlePollingTimeBetweenEmptyPolls; this.shardMetricGroupMap = shardMetricGroupMap; + this.assignedSplits = new ArrayDeque<>(); + this.pausedSplitIds = new HashSet<>(); + } + + private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext splitContext) { + long requiredDelay = + splitContext.wasLastPollEmpty + ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis() + : getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis(); + + return splitContext.lastPollTimeMillis + requiredDelay; } @Override public RecordsWithSplitIds<Record> fetch() throws IOException { - DynamoDbStreamsShardSplitState splitState = assignedSplits.poll(); - if (splitState == null) { + if (assignedSplits.isEmpty()) { + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } + DynamoDbStreamsShardSplitWithContext splitContext = assignedSplits.poll(); + + if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) { + assignedSplits.add(splitContext); return INCOMPLETE_SHARD_EMPTY_RECORDS; } - if (pausedSplitIds.contains(splitState.getSplitId())) { - assignedSplits.add(splitState); + // Check if split is paused or not ready due to empty poll delay Review Comment: This comment doesn't seem appropriate (since pause is checked above) ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java: ########## @@ -320,10 +336,103 @@ record -> for (int i = 0; i < 10; i++) { RecordsWithSplitIds<Record> records = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(records)); + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); } assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new Record[0])); } + @Test + void testPollingDelayForEmptyRecords() throws Exception { + // Given assigned split with no records + testStreamProxy.addShards(TEST_SHARD_ID); + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + // First poll - should return empty records + RecordsWithSplitIds<Record> firstPoll = splitReader.fetch(); + assertThat(firstPoll.nextRecordFromSplit()).isNull(); + assertThat(firstPoll.nextSplit()).isNull(); + assertThat(firstPoll.finishedSplits()).isEmpty(); + + // Immediate second poll - should return empty due to polling delay + RecordsWithSplitIds<Record> secondPoll = splitReader.fetch(); + assertThat(secondPoll.nextRecordFromSplit()).isNull(); + assertThat(secondPoll.nextSplit()).isNull(); + assertThat(secondPoll.finishedSplits()).isEmpty(); + } + + @Test + void testLessPollingDelayForNonEmptyRecords() throws Exception { + // Given assigned split with records + testStreamProxy.addShards(TEST_SHARD_ID); + Record record1 = getTestRecord("data-1"); + Record record2 = getTestRecord("data-2"); + + testStreamProxy.addRecords( + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(record1)); + + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); + // First poll - should return record1 + RecordsWithSplitIds<Record> firstPoll = splitReader.fetch(); + assertThat(readAllRecords(firstPoll)).containsExactly(record1); + + // Add second record + testStreamProxy.addRecords( + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(record2)); + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); Review Comment: Can we use Awaitility instead of Thread.sleep? https://github.com/awaitility/awaitility ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java: ########## @@ -57,48 +60,89 @@ public class PollingDynamoDbStreamsShardSplitReader new DynamoDbStreamRecordsWithSplitIds(Collections.emptyIterator(), null, false); private final StreamProxy dynamodbStreams; + private final Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls; + private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls; - private final Deque<DynamoDbStreamsShardSplitState> assignedSplits = new ArrayDeque<>(); + private final Deque<DynamoDbStreamsShardSplitWithContext> assignedSplits; private final Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap; - private final Set<String> pausedSplitIds = new HashSet<>(); + private final Set<String> pausedSplitIds; + private static final Logger LOG = + LoggerFactory.getLogger(PollingDynamoDbStreamsShardSplitReader.class); public PollingDynamoDbStreamsShardSplitReader( StreamProxy dynamodbStreamsProxy, + Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls, + Duration getRecordsIdlePollingTimeBetweenEmptyPolls, Map<String, DynamoDbStreamsShardMetrics> shardMetricGroupMap) { this.dynamodbStreams = dynamodbStreamsProxy; + this.getRecordsIdlePollingTimeBetweenNonEmptyPolls = + getRecordsIdlePollingTimeBetweenNonEmptyPolls; + this.getRecordsIdlePollingTimeBetweenEmptyPolls = + getRecordsIdlePollingTimeBetweenEmptyPolls; this.shardMetricGroupMap = shardMetricGroupMap; + this.assignedSplits = new ArrayDeque<>(); + this.pausedSplitIds = new HashSet<>(); + } + + private long getNextEligibleTime(DynamoDbStreamsShardSplitWithContext splitContext) { + long requiredDelay = + splitContext.wasLastPollEmpty + ? getRecordsIdlePollingTimeBetweenEmptyPolls.toMillis() + : getRecordsIdlePollingTimeBetweenNonEmptyPolls.toMillis(); + + return splitContext.lastPollTimeMillis + requiredDelay; } @Override public RecordsWithSplitIds<Record> fetch() throws IOException { - DynamoDbStreamsShardSplitState splitState = assignedSplits.poll(); - if (splitState == null) { + if (assignedSplits.isEmpty()) { + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } + DynamoDbStreamsShardSplitWithContext splitContext = assignedSplits.poll(); + + if (pausedSplitIds.contains(splitContext.splitState.getSplitId())) { + assignedSplits.add(splitContext); return INCOMPLETE_SHARD_EMPTY_RECORDS; } - if (pausedSplitIds.contains(splitState.getSplitId())) { - assignedSplits.add(splitState); + // Check if split is paused or not ready due to empty poll delay + long currentTime = System.currentTimeMillis(); + long nextEligibleTime = getNextEligibleTime(splitContext); + + LOG.debug( + "Polling split: {}, currentTime: {}, eligibleTime: {}, wasEmptyPoll: {}", + splitContext.splitState.getSplitId(), + currentTime, + nextEligibleTime, + splitContext.wasLastPollEmpty); + if (nextEligibleTime > currentTime) { + assignedSplits.add(splitContext); + sleep(1); Review Comment: Does this actually help with CPU 100%? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -76,6 +76,21 @@ public enum InitialPosition { public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT = "Apache Flink %s (%s) DynamoDb Streams Connector"; + public static final ConfigOption<Duration> + DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS = + ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay") + .durationType() + .defaultValue(Duration.ofMillis(1000)) + .withDescription( + "The idle time between empty polls for DynamoDB Streams GetRecords API"); + public static final ConfigOption<Duration> + DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_NON_EMPTY_POLLS = + ConfigOptions.key("flink.dynamodbstreams.getrecords.nonempty.mindelay") + .durationType() + .defaultValue(Duration.ofMillis(250)) Review Comment: Why not make this default to 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org