This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new edb1301123 add metric for time retention failing due to end time
(#13879)
edb1301123 is described below
commit edb13011230228393350cddca54e4ac4c4b7e08b
Author: Johan Adami <[email protected]>
AuthorDate: Fri Sep 6 18:43:50 2024 -0400
add metric for time retention failing due to end time (#13879)
---
.../pinot/common/metrics/ControllerGauge.java | 5 +++
.../controller/helix/SegmentStatusChecker.java | 26 +++++++++++++++
.../controller/helix/SegmentStatusCheckerTest.java | 37 ++++++++++++++++++++++
3 files changed, 68 insertions(+)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 82c4e666e1..d052a75485 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -35,6 +35,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge
{
PERCENT_OF_REPLICAS("percent", false),
SEGMENTS_IN_ERROR_STATE("segments", false),
+ // Segment start and end time is stored in milliseconds.
+ // Invalid start/end time means the broker time pruner will not work
correctly.
+ // Invalid end times means time retention will not happen for that segment.
+ SEGMENTS_WITH_INVALID_START_TIME("segments", false),
+ SEGMENTS_WITH_INVALID_END_TIME("segments", false),
// Percentage of segments with at least one online replica in external view
as compared to total number of segments in
// ideal state
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index a9a2484752..c9a48022c0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -282,6 +283,8 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
List<String> offlineSegments = new ArrayList<>();
// Segments with fewer replicas online (ONLINE/CONSUMING) in external view
than in ideal state
List<String> partialOnlineSegments = new ArrayList<>();
+ List<String> segmentsInvalidStartTime = new ArrayList<>();
+ List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
for (Map.Entry<String, String> entry :
idealState.getInstanceStateMap(segment).entrySet()) {
@@ -318,6 +321,15 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
continue;
}
+ if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
+ if
(!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) {
+ segmentsInvalidStartTime.add(segment);
+ }
+ if
(!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) {
+ segmentsInvalidEndTime.add(segment);
+ }
+ }
+
int numEVReplicas = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
@@ -378,6 +390,16 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
LOGGER.warn("Table {} has {} segments with fewer replicas than the
replication factor: {}", tableNameWithType,
numPartialOnlineSegments, logSegments(partialOnlineSegments));
}
+ int numInvalidStartTime = segmentsInvalidStartTime.size();
+ if (numInvalidStartTime > 0) {
+ LOGGER.warn("Table {} has {} segments with invalid start time: {}",
tableNameWithType, numInvalidStartTime,
+ logSegments(segmentsInvalidStartTime));
+ }
+ int numInvalidEndTime = segmentsInvalidEndTime.size();
+ if (numInvalidEndTime > 0) {
+ LOGGER.warn("Table {} has {} segments with invalid end time: {}",
tableNameWithType, numInvalidEndTime,
+ logSegments(segmentsInvalidEndTime));
+ }
// Synchronization provided by Controller Gauge to make sure that only one
thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
@@ -391,6 +413,10 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
numPartialOnlineSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME,
+ numInvalidStartTime);
+ _controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME,
+ numInvalidEndTime);
if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index 1edd2176e6..5f2ae7ea32 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -700,4 +701,40 @@ public class SegmentStatusCheckerTest {
String jsonString = JsonUtils.objectToPrettyString(segmentStatusInfoList);
assertEquals(jsonString, json);
}
+
+ @Test
+ public void testInvalidSegmentStartEndTime() {
+ IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
+ idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
+ idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
+ idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
+ idealState.setReplicas("3");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+
+ ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
+ externalView.setState("myTable_0", "pinot1", "ONLINE");
+ externalView.setState("myTable_0", "pinot2", "ONLINE");
+ externalView.setState("myTable_0", "pinot3", "ONLINE");
+
+ ZNRecord znRecord = new ZNRecord("myTable_0");
+ znRecord.setLongField(CommonConstants.Segment.START_TIME,
TimeUtils.VALID_MIN_TIME_MILLIS - 1);
+ znRecord.setLongField(CommonConstants.Segment.END_TIME,
TimeUtils.VALID_MAX_TIME_MILLIS + 1);
+ SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234,
11111L);
+
when(segmentZKMetadata.getStartTimeMs()).thenReturn(TimeUtils.VALID_MIN_TIME_MILLIS
- 1);
+
when(segmentZKMetadata.getEndTimeMs()).thenReturn(TimeUtils.VALID_MAX_TIME_MILLIS
+ 1);
+
+ PinotHelixResourceManager resourceManager =
mock(PinotHelixResourceManager.class);
+
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
+
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
+ when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME),
anyString())).thenReturn(segmentZKMetadata);
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+ runSegmentStatusChecker(resourceManager, 0);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME), 1);
+ assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics,
OFFLINE_TABLE_NAME,
+ ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME), 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]