This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2de61de8bc Remove `recreateDeletedConsumingSegment` flag from
RealtimeSegmentValidationManager (#14024)
2de61de8bc is described below
commit 2de61de8bcf4c5befce3404543f25025cbbf7cbd
Author: Shounak kulkarni <[email protected]>
AuthorDate: Thu Sep 19 18:44:55 2024 +0530
Remove `recreateDeletedConsumingSegment` flag from
RealtimeSegmentValidationManager (#14024)
* Remove recreateDeletedConsumingSegment flag
In favour of always recreating deleted consuming segments if table is not
paused.
* handle resumption upon storage quota getting freed up
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 14 ++---
.../RealtimeSegmentValidationManager.java | 63 ++++++++++------------
.../PinotLLCRealtimeSegmentManagerTest.java | 5 +-
3 files changed, 35 insertions(+), 47 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d799000ed3..7a459d7ddb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -912,11 +912,9 @@ public class PinotLLCRealtimeSegmentManager {
* Check whether there are segments in the PROPERTYSTORE with status DONE,
but no new segment in status
* IN_PROGRESS, and the state for the latest segment in the IDEALSTATE is
ONLINE.
* If so, it should create a new CONSUMING segment for the partition.
- * (this operation is done only if @param recreateDeletedConsumingSegment is
set to true,
- * which means it's manually triggered by admin not by automatic periodic
task)
*/
public void ensureAllPartitionsConsuming(TableConfig tableConfig,
StreamConfig streamConfig,
- boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
+ OffsetCriteria offsetCriteria) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -938,7 +936,7 @@ public class PinotLLCRealtimeSegmentManager {
getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig,
idealState, newPartitionGroupMetadataList,
- recreateDeletedConsumingSegment, offsetCriteria);
+ offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
@@ -1158,8 +1156,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig,
StreamConfig streamConfig, IdealState idealState,
- List<PartitionGroupMetadata> newPartitionGroupMetadataList, boolean
recreateDeletedConsumingSegment,
- OffsetCriteria offsetCriteria) {
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList,
OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
@@ -1275,7 +1272,7 @@ public class PinotLLCRealtimeSegmentManager {
instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
- if (recreateDeletedConsumingSegment &&
latestSegmentZKMetadata.getStatus().isCompleted()
+ if (latestSegmentZKMetadata.getStatus().isCompleted()
&& isAllInstancesInState(instanceStateMap,
SegmentStateModel.ONLINE)) {
// If we get here, that means in IdealState, the latest
segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
@@ -1737,7 +1734,6 @@ public class PinotLLCRealtimeSegmentManager {
// trigger realtime segment validation job to resume consumption
Map<String, String> taskProperties = new HashMap<>();
-
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
"true");
if (offsetCriteria != null) {
taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA,
offsetCriteria);
}
@@ -1749,7 +1745,7 @@ public class PinotLLCRealtimeSegmentManager {
+ "endpoint in a few moments to double check.", new
Timestamp(System.currentTimeMillis()).toString());
}
- private IdealState updatePauseStateInIdealState(String tableNameWithType,
boolean pause,
+ public IdealState updatePauseStateInIdealState(String tableNameWithType,
boolean pause,
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 856d88c226..b8460a406a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,6 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
private final int _segmentLevelValidationIntervalInSeconds;
private long _lastSegmentLevelValidationRunTimeMs = 0L;
- public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY =
"recreateDeletedConsumingSegment";
public static final String OFFSET_CRITERIA = "offsetCriteria";
public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
@@ -87,8 +86,6 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
context._runSegmentLevelValidation = true;
_lastSegmentLevelValidationRunTimeMs = currentTimeMs;
}
- context._recreateDeletedConsumingSegment =
-
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
String offsetCriteriaStr =
periodicTaskProperties.getProperty(OFFSET_CRITERIA);
if (offsetCriteriaStr != null) {
context._offsetCriteria = new
OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
@@ -113,44 +110,41 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
runSegmentLevelValidation(tableConfig, streamConfig);
}
- if (shouldEnsureConsuming(tableNameWithType, context)) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
streamConfig,
- context._recreateDeletedConsumingSegment, context._offsetCriteria);
+ if (shouldEnsureConsuming(tableNameWithType)) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
streamConfig, context._offsetCriteria);
}
}
- private boolean shouldEnsureConsuming(String tableNameWithType, Context
context) {
- // Keeps the table paused/unpaused based pause validations.
- // Skips updating the pause state if table is paused by admin
- PauseState pauseState = computePauseState(tableNameWithType);
- if (!pauseState.isPaused()) {
- boolean unPausedUponStorageWithinQuota =
-
pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
- if (unPausedUponStorageWithinQuota) {
- // recreate consuming segments if table is resumed upon the table
storage getting within quota limit
- context._recreateDeletedConsumingSegment = true;
- }
- }
- return !pauseState.isPaused();
- }
-
- private PauseState computePauseState(String tableNameWithType) {
+ /**
+ *
+ * Updates the table paused state based on pause validations (e.g. storage
quota being exceeded).
+ * Skips updating the pause state if table is paused by admin.
+ * Returns true if table is not paused
+ */
+ private boolean shouldEnsureConsuming(String tableNameWithType) {
PauseStatusDetails pauseStatus =
_llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
boolean isTablePaused = pauseStatus.getPauseFlag();
// if table is paused by admin then don't compute
- if (!isTablePaused ||
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED))
{
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- boolean isQuotaExceeded =
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
- // if quota breach and pause flag is not in sync, update the IS
- if (isQuotaExceeded != isTablePaused) {
- String storageQuota = tableConfig.getQuotaConfig() != null ?
tableConfig.getQuotaConfig().getStorage() : "NA";
- pauseStatus =
_llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
- PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
- isQuotaExceeded ? "Storage quota of " + storageQuota + "
exceeded." : "Table storage within quota limits");
- }
+ if (isTablePaused &&
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) {
+ return false;
+ }
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ boolean isQuotaExceeded =
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
+ if (isQuotaExceeded == isTablePaused) {
+ return !isTablePaused;
+ }
+ // if quota breach and pause flag is not in sync, update the IS
+ if (isQuotaExceeded) {
+ String storageQuota = tableConfig.getQuotaConfig() != null ?
tableConfig.getQuotaConfig().getStorage() : "NA";
+ // as quota is breached pause the consumption right away
+ _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
+ "Storage quota of " + storageQuota + " exceeded.");
+ } else {
+ // as quota limit is being honored, unset the pause state and allow
consuming segment recreation.
+
_llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType,
false,
+ PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, "Table storage within
quota limits");
}
- return new PauseState(pauseStatus.getPauseFlag(),
pauseStatus.getReasonCode(), pauseStatus.getComment(),
- pauseStatus.getTimestamp());
+ return !isQuotaExceeded;
}
private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig
streamConfig) {
@@ -204,7 +198,6 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
public static final class Context {
private boolean _runSegmentLevelValidation;
- private boolean _recreateDeletedConsumingSegment;
private OffsetCriteria _offsetCriteria;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 7127294fee..435303f5e9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -878,8 +878,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Expected
}
try {
- segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig,
segmentManager._streamConfig, false,
- null);
+ segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig,
segmentManager._streamConfig, null);
fail();
} catch (IllegalStateException e) {
// Expected
@@ -1146,7 +1145,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
- getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList()), false, null);
+ getNewPartitionGroupMetadataList(_streamConfig,
Collections.emptyList()), null);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]