gguptp commented on code in PR #190: URL: https://github.com/apache/flink-connector-aws/pull/190#discussion_r1980009840
########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java: ########## @@ -320,10 +336,103 @@ record -> for (int i = 0; i < 10; i++) { RecordsWithSplitIds<Record> records = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(records)); + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); } assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new Record[0])); } + @Test + void testPollingDelayForEmptyRecords() throws Exception { + // Given assigned split with no records + testStreamProxy.addShards(TEST_SHARD_ID); + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + // First poll - should return empty records + RecordsWithSplitIds<Record> firstPoll = splitReader.fetch(); + assertThat(firstPoll.nextRecordFromSplit()).isNull(); + assertThat(firstPoll.nextSplit()).isNull(); + assertThat(firstPoll.finishedSplits()).isEmpty(); + + // Immediate second poll - should return empty due to polling delay + RecordsWithSplitIds<Record> secondPoll = splitReader.fetch(); + assertThat(secondPoll.nextRecordFromSplit()).isNull(); + assertThat(secondPoll.nextSplit()).isNull(); + assertThat(secondPoll.finishedSplits()).isEmpty(); + } + + @Test + void testLessPollingDelayForNonEmptyRecords() throws Exception { + // Given assigned split with records + testStreamProxy.addShards(TEST_SHARD_ID); + Record record1 = getTestRecord("data-1"); + Record record2 = getTestRecord("data-2"); + + testStreamProxy.addRecords( + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(record1)); + + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); + // First poll - should return record1 + RecordsWithSplitIds<Record> firstPoll = splitReader.fetch(); + assertThat(readAllRecords(firstPoll)).containsExactly(record1); + + // Add second record + testStreamProxy.addRecords( + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(record2)); + Thread.sleep(NON_EMPTY_POLLING_DELAY_MILLIS.toMillis()); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org