This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d2d1cb8b28 [HUDI-4615] Return checkpoint as null for empty data from
events queue. (#6387)
d2d1cb8b28 is described below
commit d2d1cb8b289f43e4f467d197013f1273cd350034
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed Sep 7 09:24:38 2022 +0530
[HUDI-4615] Return checkpoint as null for empty data from events queue.
(#6387)
Co-authored-by: sivabalan <[email protected]>
---
.../sources/helpers/S3EventsMetaSelector.java | 4 +++-
.../sources/helpers/TestS3EventsMetaSelector.java | 24 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
index 68ac7aba5c..13de715540 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -153,7 +153,9 @@ public class S3EventsMetaSelector extends
CloudObjectsSelector {
for (Map<String, Object> eventRecord : eventRecords) {
filteredEventRecords.add(new
ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
}
- return new ImmutablePair<>(filteredEventRecords,
String.valueOf(newCheckpointTime));
+ // Return the old checkpoint if no messages to consume from queue.
+ String newCheckpoint = newCheckpointTime == 0 ?
lastCheckpointStr.orElse(null) : String.valueOf(newCheckpointTime);
+ return new ImmutablePair<>(filteredEventRecords, newCheckpoint);
} catch (JSONException | IOException e) {
throw new HoodieException("Unable to read from SQS: ", e);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
index 2208543c08..f38e89b217 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
@@ -27,11 +27,14 @@ import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import org.apache.hadoop.fs.Path;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
@@ -43,8 +46,12 @@ import java.util.List;
import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
import static
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
public class TestS3EventsMetaSelector extends HoodieClientTestHarness {
@@ -102,4 +109,21 @@ public class TestS3EventsMetaSelector extends
HoodieClientTestHarness {
.getString("key"));
assertEquals("1627376736755", eventFromQueue.getRight());
}
+
+ @Test
+ public void testEventsFromQueueNoMessages() {
+ S3EventsMetaSelector selector = new S3EventsMetaSelector(props);
+ when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
+ .thenReturn(
+ new GetQueueAttributesResult()
+ .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, "0"));
+
+ List<Message> processed = new ArrayList<>();
+ Pair<List<String>, String> eventFromQueue =
+ selector.getNextEventsFromQueue(sqs, Option.empty(), processed);
+
+ assertEquals(0, eventFromQueue.getLeft().size());
+ assertEquals(0, processed.size());
+ assertNull(eventFromQueue.getRight());
+ }
}