This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9f5fe2b7be Fix the potential deadlock for partial-upsert segment
loading check (#10198)
9f5fe2b7be is described below
commit 9f5fe2b7be241f778890483de05a1b45fed4d2d5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Jan 29 23:38:20 2023 -0800
Fix the potential deadlock for partial-upsert segment loading check (#10198)
---
.../realtime/LLRealtimeSegmentDataManager.java | 19 ++++-
.../manager/realtime/RealtimeTableDataManager.java | 64 ++++++++++-----
.../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +-
.../local/utils/tablestate/TableStateUtils.java | 90 +++++++++++-----------
4 files changed, 104 insertions(+), 71 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index cffebf90db..b3607e6a06 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.Utils;
@@ -232,6 +233,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
private final AtomicBoolean _acquiredConsumerSemaphore;
private final String _metricKeyName;
private final ServerMetrics _serverMetrics;
+ private final BooleanSupplier _isReadyToConsumeData;
private final MutableSegmentImpl _realtimeSegment;
private volatile StreamPartitionMsgOffset _currentOffset;
private volatile State _state;
@@ -395,6 +397,17 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
protected boolean consumeLoop()
throws Exception {
+ // At this point, we know that we can potentially move the offset, so the
old saved segment file is not valid
+ // anymore. Remove the file if it exists.
+ removeSegmentFile();
+
+ if (!_isReadyToConsumeData.getAsBoolean()) {
+ do {
+ //noinspection BusyWait
+
Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+ } while (!_shouldStop && !endCriteriaReached() &&
_isReadyToConsumeData.getAsBoolean());
+ }
+
_numRowsErrored = 0;
final long idlePipeSleepTimeMillis = 100;
final long idleTimeoutMillis =
_partitionLevelStreamConfig.getIdleTimeoutMillis();
@@ -403,9 +416,6 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
StreamPartitionMsgOffset lastUpdatedOffset =
_streamPartitionMsgOffsetFactory
.create(_currentOffset); // so that we always update the metric when
we enter this method.
- // At this point, we know that we can potentially move the offset, so the
old saved segment file is not valid
- // anymore. Remove the file if it exists.
- removeSegmentFile();
_segmentLogger.info("Starting consumption loop start offset {},
finalOffset {}", _currentOffset, _finalOffset);
while (!_shouldStop && !endCriteriaReached()) {
@@ -1263,7 +1273,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
RealtimeTableDataManager realtimeTableDataManager, String
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore
partitionGroupConsumerSemaphore,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager
partitionUpsertMetadataManager,
- @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) {
+ @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager,
BooleanSupplier isReadyToConsumeData) {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = segmentZKMetadata;
_tableConfig = tableConfig;
@@ -1273,6 +1283,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_serverMetrics = serverMetrics;
+ _isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_leaseExtender =
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 48b849834f..9315770943 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -31,7 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
@@ -114,13 +114,17 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// likely that we get fresh data each time instead of multiple copies of
roughly same data.
private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
- private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean();
+ public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
+
+ // TODO: Change it to BooleanSupplier
+ private final Supplier<Boolean> _isServerReadyToServeQueries;
- private TableDedupMetadataManager _tableDedupMetadataManager;
- private TableUpsertMetadataManager _tableUpsertMetadataManager;
// Object to track ingestion delay for all partitions
private IngestionDelayTracker _ingestionDelayTracker;
- private final Supplier<Boolean> _isServerReadyToServeQueries;
+
+ private TableDedupMetadataManager _tableDedupMetadataManager;
+ private TableUpsertMetadataManager _tableUpsertMetadataManager;
+ private BooleanSupplier _isTableReadyToConsumeData;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
this(segmentBuildSemaphore, () -> true);
@@ -135,8 +139,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
protected void doInit() {
_leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId,
_serverMetrics, _tableNameWithType);
// Tracks ingestion delay of all partitions being served for this table
- _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics,
_tableNameWithType, this,
- _isServerReadyToServeQueries);
+ _ingestionDelayTracker =
+ new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this,
_isServerReadyToServeQueries);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
_statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -203,6 +207,36 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, schema, this,
_serverMetrics);
}
+
+ // For dedup and partial-upsert, need to wait for all segments loaded
before starting consuming data
+ if (isDedupEnabled() || isPartialUpsertEnabled()) {
+ _isTableReadyToConsumeData = new BooleanSupplier() {
+ volatile boolean _allSegmentsLoaded;
+ long _lastCheckTimeMs;
+
+ @Override
+ public boolean getAsBoolean() {
+ if (_allSegmentsLoaded) {
+ return true;
+ } else {
+ synchronized (this) {
+ if (_allSegmentsLoaded) {
+ return true;
+ }
+ long currentTimeMs = System.currentTimeMillis();
+ if (currentTimeMs - _lastCheckTimeMs <=
READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS) {
+ return false;
+ }
+ _lastCheckTimeMs = currentTimeMs;
+ _allSegmentsLoaded =
TableStateUtils.isAllSegmentsLoaded(_helixManager, _tableNameWithType);
+ return _allSegmentsLoaded;
+ }
+ }
+ }
+ };
+ } else {
+ _isTableReadyToConsumeData = () -> true;
+ }
}
@Override
@@ -265,7 +299,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
/**
* Returns all partitionGroupIds for the partitions hosted by this server
for current table.
- * @apiNote this involves Zookeeper read and should not be used frequently
due to efficiency concerns.
+ * @apiNote this involves Zookeeper read and should not be used frequently
due to efficiency concerns.
*/
public Set<Integer> getHostedPartitionsGroupIds() {
Set<Integer> partitionsHostedByThisServer = new HashSet<>();
@@ -401,22 +435,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
- // For dedup and partial-upsert, wait for all segments loaded before
creating the consuming segment
- if (isDedupEnabled() || isPartialUpsertEnabled()) {
- if (!_allSegmentsLoaded.get()) {
- synchronized (_allSegmentsLoaded) {
- if (!_allSegmentsLoaded.get()) {
- TableStateUtils.waitForAllSegmentsLoaded(_helixManager,
_tableNameWithType);
- _allSegmentsLoaded.set(true);
- }
- }
- }
- }
-
segmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager,
- partitionDedupMetadataManager);
+ partitionDedupMetadataManager, _isTableReadyToConsumeData);
} else {
InstanceZKMetadata instanceZKMetadata =
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata,
tableConfig, instanceZKMetadata, this,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index b6c290cde6..fd75a9a0f0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1001,7 +1001,7 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager,
resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(),
tableConfig), schema, llcSegmentName,
- semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null, null);
+ semaphoreMap.get(llcSegmentName.getPartitionGroupId()),
serverMetrics, null, null, () -> true);
_state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
_shouldStop =
LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index c11886f613..6de4536de1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -27,13 +27,14 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.spi.utils.CommonConstants;
+import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TableStateUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableStateUtils.class);
+ private static final int MAX_NUM_SEGMENTS_TO_LOG = 10;
private TableStateUtils() {
}
@@ -83,58 +84,57 @@ public class TableStateUtils {
* @return true if all segments for the given table are succesfully loaded.
False otherwise
*/
public static boolean isAllSegmentsLoaded(HelixManager helixManager, String
tableNameWithType) {
+ List<String> onlineSegments =
+ getSegmentsInGivenStateForThisInstance(helixManager,
tableNameWithType, SegmentStateModel.ONLINE);
+ if (onlineSegments.isEmpty()) {
+ LOGGER.info("No ONLINE segment found for table: {}", tableNameWithType);
+ return true;
+ }
+
+ // Check if ideal state and current state matches for all segments
assigned to the current instance
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
String instanceName = helixManager.getInstanceName();
-
- List<String> onlineSegments =
getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType,
- CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
- if (onlineSegments.size() > 0) {
- LiveInstance liveInstance =
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
- if (liveInstance == null) {
- LOGGER.warn("Failed to find live instance for instance: {}",
instanceName);
- return false;
- }
- String sessionId = liveInstance.getEphemeralOwner();
- CurrentState currentState =
- dataAccessor.getProperty(keyBuilder.currentState(instanceName,
sessionId, tableNameWithType));
- if (currentState == null) {
- LOGGER.warn("Failed to find current state for instance: {}, sessionId:
{}, table: {}", instanceName, sessionId,
- tableNameWithType);
- return false;
- }
- // Check if ideal state and current state matches for all segments
assigned to the current instance
- Map<String, String> currentStateMap =
currentState.getPartitionStateMap();
-
- for (String segmentName : onlineSegments) {
- String actualState = currentStateMap.get(segmentName);
- if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(actualState))
{
- if
(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(actualState)) {
- LOGGER.error("Find ERROR segment: {}, table: {}, expected: {}",
segmentName, tableNameWithType,
- CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
- } else {
- LOGGER.info("Find unloaded segment: {}, table: {}, expected: {},
actual: {}", segmentName,
- tableNameWithType,
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE, actualState);
- }
+ LiveInstance liveInstance =
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ LOGGER.warn("Failed to find live instance for instance: {}",
instanceName);
+ return false;
+ }
+ String sessionId = liveInstance.getEphemeralOwner();
+ CurrentState currentState =
+ dataAccessor.getProperty(keyBuilder.currentState(instanceName,
sessionId, tableNameWithType));
+ if (currentState == null) {
+ LOGGER.warn("Failed to find current state for instance: {}, sessionId:
{}, table: {}", instanceName, sessionId,
+ tableNameWithType);
+ return false;
+ }
+ List<String> unloadedSegments = new ArrayList<>();
+ Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+ for (String segmentName : onlineSegments) {
+ String actualState = currentStateMap.get(segmentName);
+ if (!SegmentStateModel.ONLINE.equals(actualState)) {
+ if (SegmentStateModel.ERROR.equals(actualState)) {
+ LOGGER.error("Found segment: {}, table: {} in ERROR state, expected:
{}", segmentName, tableNameWithType,
+ SegmentStateModel.ONLINE);
return false;
+ } else {
+ unloadedSegments.add(segmentName);
}
}
}
-
- LOGGER.info("All segments loaded for table: {}", tableNameWithType);
- return true;
- }
-
- public static void waitForAllSegmentsLoaded(HelixManager helixManager,
String tableNameWithType) {
- try {
- while (!TableStateUtils.isAllSegmentsLoaded(helixManager,
tableNameWithType)) {
- LOGGER.info("Sleeping 1 second waiting for all segments loaded for
table: {}", tableNameWithType);
- //noinspection BusyWait
- Thread.sleep(1000L);
+ if (unloadedSegments.isEmpty()) {
+ LOGGER.info("All segments loaded for table: {}", tableNameWithType);
+ return true;
+ } else {
+ int numUnloadedSegments = unloadedSegments.size();
+ if (numUnloadedSegments <= MAX_NUM_SEGMENTS_TO_LOG) {
+ LOGGER.info("Found {} unloaded segments: {} for table: {}",
numUnloadedSegments, unloadedSegments,
+ tableNameWithType);
+ } else {
+ LOGGER.info("Found {} unloaded segments: {}... for table: {}",
numUnloadedSegments,
+ unloadedSegments.subList(0, MAX_NUM_SEGMENTS_TO_LOG),
tableNameWithType);
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Caught exception while waiting for all segments loaded for table: "
+ tableNameWithType, e);
+ return false;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]