This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 44dde02db1 Continue Segment Reset post instance errors (#15903)
44dde02db1 is described below
commit 44dde02db1765e73e29e448be7bde77249ab3885
Author: NOOB <[email protected]>
AuthorDate: Thu Jun 12 23:02:29 2025 +0530
Continue Segment Reset post instance errors (#15903)
---
.../helix/core/PinotHelixResourceManager.java | 32 +++++-
.../controller/helix/core/SegmentResetTest.java | 113 +++++++++++++++++++++
2 files changed, 142 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b097ecd05b..a07caac5f1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2912,15 +2912,22 @@ public class PinotHelixResourceManager {
Set<String> instanceSet = parseInstanceSet(idealState, segmentName,
targetInstance);
Map<String, String> externalViewStateMap =
externalView.getStateMap(segmentName);
+ List<String> failedInstances = new ArrayList<>();
for (String instance : instanceSet) {
if (externalViewStateMap == null ||
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
LOGGER.info("Skipping resetting for segment: {} of table: {} on
instance: {}", segmentName, tableNameWithType,
instance);
} else {
LOGGER.info("Resetting segment: {} of table: {} on instance: {}",
segmentName, tableNameWithType, instance);
- resetPartitionAllState(instance, tableNameWithType,
Collections.singleton(segmentName));
+ resetPartitionAllState(instance, tableNameWithType,
Collections.singleton(segmentName), failedInstances);
}
}
+
+ if (!failedInstances.isEmpty()) {
+ throw new RuntimeException(
+ "Reset segment failed for table: " + tableNameWithType + ", segment:
" + segmentName + ", instances: "
+ + failedInstances);
+ }
}
/**
@@ -2960,12 +2967,30 @@ public class PinotHelixResourceManager {
}
LOGGER.info("Resetting segments: {} of table: {}",
instanceToResetSegmentsMap, tableNameWithType);
+
+ List<String> failedInstances = new ArrayList<>();
for (Map.Entry<String, Set<String>> entry :
instanceToResetSegmentsMap.entrySet()) {
- resetPartitionAllState(entry.getKey(), tableNameWithType,
entry.getValue());
+ resetPartitionAllState(entry.getKey(), tableNameWithType,
entry.getValue(), failedInstances);
}
LOGGER.info("Reset segments for table {} finished. With the following
segments skipped: {}", tableNameWithType,
instanceToSkippedSegmentsMap);
+
+ if (!failedInstances.isEmpty()) {
+ throw new RuntimeException(
+ "Reset segment failed for table: " + tableNameWithType + ",
instances: " + failedInstances);
+ }
+ }
+
+ private void resetPartitionAllState(String instance, String
tableNameWithType, Set<String> segmentNames,
+ List<String> failedInstances) {
+ try {
+ resetPartitionAllState(instance, tableNameWithType, segmentNames);
+ } catch (Exception e) {
+ LOGGER.error("Failed to reset segment: {} of table: {} on instance: {}",
segmentNames, tableNameWithType,
+ instance, e);
+ failedInstances.add(instance);
+ }
}
private static Set<String> parseInstanceSet(IdealState idealState, String
segmentName,
@@ -2984,7 +3009,8 @@ public class PinotHelixResourceManager {
* This util is similar to {@link HelixAdmin#resetPartition(String, String,
String, List)}.
* However instead of resetting only the ERROR state to its initial state.
we reset all state regardless.
*/
- private void resetPartitionAllState(String instanceName, String
resourceName, Set<String> resetPartitionNames) {
+ @VisibleForTesting
+ void resetPartitionAllState(String instanceName, String resourceName,
Set<String> resetPartitionNames) {
LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
resetPartitionNames == null ? "NULL" : resetPartitionNames,
resourceName, instanceName, _helixClusterName);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
new file mode 100644
index 0000000000..6d795c39d4
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/SegmentResetTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+
+public class SegmentResetTest {
+
+ private final String _tableName = "myTable_OFFLINE";
+ private final String _segmentName = "segment_1";
+ private final String _instance1 = "instance_1";
+ private final String _instance2 = "instance_2";
+
+ private class MockPinotHelixResourceManager extends
PinotHelixResourceManager {
+
+ private final Set<String> _instanceToFail;
+
+ public MockPinotHelixResourceManager(ControllerConf controllerConf) {
+ super(controllerConf);
+ _instanceToFail = new HashSet<>();
+ }
+
+ @Override
+ public IdealState getTableIdealState(String tableNameWithType) {
+ IdealState idealState = Mockito.mock(IdealState.class);
+ when(idealState.getInstanceSet(_segmentName)).thenReturn(new
HashSet<>(Arrays.asList(_instance1, _instance2)));
+ when(idealState.getPartitionSet()).thenReturn(new
HashSet<>(Collections.singletonList(_segmentName)));
+ return idealState;
+ }
+
+ @Override
+ public ExternalView getTableExternalView(String tableNameWithType) {
+ ExternalView externalView = Mockito.mock(ExternalView.class);
+ Map<String, String> segmentStateMap = new HashMap<>();
+ segmentStateMap.put(_instance1,
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+ segmentStateMap.put(_instance2,
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+ when(externalView.getStateMap(_segmentName)).thenReturn(segmentStateMap);
+ return externalView;
+ }
+
+ @Override
+ public void resetPartitionAllState(String instanceName, String
resourceName, Set<String> resetPartitionNames) {
+ if (_instanceToFail.contains(instanceName)) {
+ throw new RuntimeException("Test: Fail reset for " + instanceName);
+ }
+ }
+
+ public void addInstanceToFail(String instanceName) {
+ _instanceToFail.add(instanceName);
+ }
+ }
+
+ @Test
+ public void testResetSegmentOneFailureOthersStillInvoked() {
+ ControllerConf cfg = new ControllerConf();
+ cfg.setZkStr("localhost:2181");
+ cfg.setHelixClusterName("cluster01");
+ MockPinotHelixResourceManager pinotHelixManager = new
MockPinotHelixResourceManager(cfg);
+ pinotHelixManager.addInstanceToFail(_instance1);
+
+ RuntimeException runtimeException =
Assert.expectThrows(RuntimeException.class,
+ () -> pinotHelixManager.resetSegment(_tableName, _segmentName, null));
+ Assert.assertEquals(runtimeException.getMessage(),
+ "Reset segment failed for table: myTable_OFFLINE, segment: segment_1,
instances: [instance_1]");
+
+ runtimeException =
+ Assert.expectThrows(RuntimeException.class, () ->
pinotHelixManager.resetSegments(_tableName, null, false));
+ Assert.assertEquals(runtimeException.getMessage(),
+ "Reset segment failed for table: myTable_OFFLINE, instances:
[instance_1]");
+ }
+
+ @Test
+ public void testResetSegmentNoFailure() {
+ ControllerConf cfg = new ControllerConf();
+ cfg.setZkStr("localhost:2181");
+ cfg.setHelixClusterName("cluster01");
+ MockPinotHelixResourceManager pinotHelixManager = new
MockPinotHelixResourceManager(cfg);
+
+ pinotHelixManager.resetSegment(_tableName, _segmentName, null);
+ pinotHelixManager.resetSegments(_tableName, null, false);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]