This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d2d1cb8b28 [HUDI-4615] Return checkpoint as null for empty data from 
events queue.  (#6387)
d2d1cb8b28 is described below

commit d2d1cb8b289f43e4f467d197013f1273cd350034
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed Sep 7 09:24:38 2022 +0530

    [HUDI-4615] Return checkpoint as null for empty data from events queue.  
(#6387)
    
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../sources/helpers/S3EventsMetaSelector.java      |  4 +++-
 .../sources/helpers/TestS3EventsMetaSelector.java  | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
index 68ac7aba5c..13de715540 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -153,7 +153,9 @@ public class S3EventsMetaSelector extends 
CloudObjectsSelector {
       for (Map<String, Object> eventRecord : eventRecords) {
         filteredEventRecords.add(new 
ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
       }
-      return new ImmutablePair<>(filteredEventRecords, 
String.valueOf(newCheckpointTime));
+      // Return the old checkpoint if no messages to consume from queue.
+      String newCheckpoint = newCheckpointTime == 0 ? 
lastCheckpointStr.orElse(null) : String.valueOf(newCheckpointTime);
+      return new ImmutablePair<>(filteredEventRecords, newCheckpoint);
     } catch (JSONException | IOException e) {
       throw new HoodieException("Unable to read from SQS: ", e);
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
index 2208543c08..f38e89b217 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestS3EventsMetaSelector.java
@@ -27,11 +27,14 @@ import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
 
 import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.hadoop.fs.Path;
 import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
@@ -43,8 +46,12 @@ import java.util.List;
 
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
 import static 
org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 
 public class TestS3EventsMetaSelector extends HoodieClientTestHarness {
 
@@ -102,4 +109,21 @@ public class TestS3EventsMetaSelector extends 
HoodieClientTestHarness {
             .getString("key"));
     assertEquals("1627376736755", eventFromQueue.getRight());
   }
+
+  @Test
+  public void testEventsFromQueueNoMessages() {
+    S3EventsMetaSelector selector = new S3EventsMetaSelector(props);
+    when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
+        .thenReturn(
+            new GetQueueAttributesResult()
+                .addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, "0"));
+
+    List<Message> processed = new ArrayList<>();
+    Pair<List<String>, String> eventFromQueue =
+        selector.getNextEventsFromQueue(sqs, Option.empty(), processed);
+
+    assertEquals(0, eventFromQueue.getLeft().size());
+    assertEquals(0, processed.size());
+    assertNull(eventFromQueue.getRight());
+  }
 }

Reply via email to