klsince commented on code in PR #12883:
URL: https://github.com/apache/pinot/pull/12883#discussion_r1573060376
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
package org.apache.pinot.server.starter.helix;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- // constructor parameters
- protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
-
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ private final InstanceDataManager _instanceDataManager;
+ private final Map<String, Set<String>> _consumingSegmentsByTable;
+ private final Map<String, Set<String>> _caughtUpSegmentsByTable = new
HashMap<>();
+ private final Function<String, Set<String>> _consumingSegmentsSupplier;
+ /**
+ * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided
as it can be costly to get
+ * consumingSegmentsByTable via the supplier, so only use it when any
missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegmentsByTable, Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegmentsByTable = consumingSegmentsByTable;
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
- public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ // This might be called by multiple threads, thus synchronized to be correct.
+ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria()
{
+ Set<String> tablesWithMissingSegment = new HashSet<>();
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegmentsByTable.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Will check
consumption status later", tableNameWithType);
+ tablesWithMissingSegment.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ for (String segName : consumingSegments) {
+ if (caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
- continue;
- }
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
+ tablesWithMissingSegment.add(tableNameWithType);
+ continue;
+ }
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to
a committed segment. If that's the case,
+ // segment data manager will not be of type
RealtimeSegmentDataManager.
+ _logger.info("Segment: {} is already committed and is considered
caught up.", segName);
+ caughtUpSegments.add(segName);
+ continue;
+ }
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ caughtUpSegments.add(segName);
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
}
- } finally {
- if (segmentDataManager != null) {
- tableDataManager.releaseSegment(segmentDataManager);
+ }
Review Comment:
good point, but I'll remove the table while calculating the
numLaggingSegments below, so that future checks skip the table whose consuming
segments have all caught up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]