This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c9f3d3012 fix minor bug when empty response (#13257)
89c9f3d3012 is described below

commit 89c9f3d3012db7b97449277ab6aae41d89ccff79
Author: Tim Brown <[email protected]>
AuthorDate: Sat May 3 17:52:04 2025 -0500

    fix minor bug when empty response (#13257)
---
 .../sources/helpers/gcs/PubsubQueueClient.java     | 10 ++-
 .../sources/helpers/gcs/TestPubsubQueueClient.java | 79 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java
index 7f93d32b606..d7ff7319e71 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubQueueClient.java
@@ -27,6 +27,7 @@ import com.google.monitoring.v3.ListTimeSeriesRequest;
 import com.google.monitoring.v3.Point;
 import com.google.monitoring.v3.ProjectName;
 import com.google.monitoring.v3.TimeInterval;
+import com.google.monitoring.v3.TimeSeries;
 import com.google.protobuf.util.Timestamps;
 import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.PullRequest;
@@ -34,6 +35,7 @@ import com.google.pubsub.v1.PullResponse;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -45,7 +47,7 @@ public class PubsubQueueClient {
     return GrpcSubscriberStub.create(subscriberStubSettings);
   }
 
-  public PullResponse makePullRequest(SubscriberStub subscriber, String 
subscriptionName, int batchSize) throws IOException {
+  public PullResponse makePullRequest(SubscriberStub subscriber, String 
subscriptionName, int batchSize) {
     PullRequest pullRequest = PullRequest.newBuilder()
         .setMaxMessages(batchSize)
         .setSubscription(subscriptionName)
@@ -73,7 +75,11 @@ public class PubsubQueueClient {
                   .build())
               .build());
       // use the latest value from the window
-      List<Point> pointList = 
response.getPage().getValues().iterator().next().getPointsList();
+      Iterator<TimeSeries> values = response.getPage().getValues().iterator();
+      if (!values.hasNext()) {
+        return 0;
+      }
+      List<Point> pointList = values.next().getPointsList();
       return pointList.stream().findFirst().map(point -> 
point.getValue().getInt64Value()).orElse(Long.MAX_VALUE);
     }
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubQueueClient.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubQueueClient.java
new file mode 100644
index 00000000000..86fe4500237
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/gcs/TestPubsubQueueClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.monitoring.v3.MetricServiceClient;
+import com.google.monitoring.v3.ListTimeSeriesRequest;
+import com.google.monitoring.v3.Point;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.monitoring.v3.TypedValue;
+import com.google.protobuf.util.Timestamps;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestPubsubQueueClient {
+  private static Stream<Arguments> getNumUnAckedMessages() {
+    TimeSeries timeSeries = TimeSeries.newBuilder()
+        
.addPoints(Point.newBuilder().setValue(TypedValue.newBuilder().setInt64Value(100L).build()).build())
+        .build();
+    return Stream.of(
+        Arguments.of(Collections.singletonList(timeSeries), 100L),
+        Arguments.of(Collections.emptyList(), 0L));
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  void getNumUnAckedMessages(List<TimeSeries> responseValues, long expected) 
throws Exception {
+    String subscriptionId = "subscriptionId";
+    PubsubQueueClient pubsubQueueClient = new PubsubQueueClient();
+    try (MockedStatic<MetricServiceClient> mockedStaticServiceClient = 
Mockito.mockStatic(MetricServiceClient.class);
+         MockedStatic<ServiceOptions> mockedStaticServiceOptions = 
Mockito.mockStatic(ServiceOptions.class)) {
+      MetricServiceClient metricServiceClient = 
Mockito.mock(MetricServiceClient.class);
+      
mockedStaticServiceClient.when(MetricServiceClient::create).thenReturn(metricServiceClient);
+      
mockedStaticServiceOptions.when(ServiceOptions::getDefaultProjectId).thenReturn("projectId");
+
+      MetricServiceClient.ListTimeSeriesPagedResponse response = 
mock(MetricServiceClient.ListTimeSeriesPagedResponse.class, RETURNS_DEEP_STUBS);
+      ArgumentCaptor<ListTimeSeriesRequest> requestCaptor = 
ArgumentCaptor.forClass(ListTimeSeriesRequest.class);
+      
when(metricServiceClient.listTimeSeries(requestCaptor.capture())).thenReturn(response);
+
+      when(response.getPage().getValues()).thenReturn(responseValues);
+      long actualNumUnAckedMessages = 
pubsubQueueClient.getNumUnAckedMessages(subscriptionId);
+      assertEquals(expected, actualNumUnAckedMessages);
+
+      ListTimeSeriesRequest request = requestCaptor.getValue();
+      long durationMs = 
Timestamps.toMillis(request.getInterval().getEndTime()) - 
Timestamps.toMillis(request.getInterval().getStartTime());
+      assertEquals(120_000, durationMs);
+      
assertEquals("metric.type=\"pubsub.googleapis.com/subscription/num_undelivered_messages\"
 AND resource.label.subscription_id=\"subscriptionId\"", request.getFilter());
+    }
+  }
+}

Reply via email to