Jackie-Jiang commented on code in PR #12883:
URL: https://github.com/apache/pinot/pull/12883#discussion_r1573017665
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java:
##########
@@ -359,6 +359,21 @@ private void registerServiceStatusHandler() {
new
ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}
+ private Set<String> getConsumingSegments(String realtimeTableName) {
Review Comment:
(minor) Annotate return as `@Nullable`
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
package org.apache.pinot.server.starter.helix;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- // constructor parameters
- protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
-
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ private final InstanceDataManager _instanceDataManager;
+ private final Map<String, Set<String>> _consumingSegmentsByTable;
+ private final Map<String, Set<String>> _caughtUpSegmentsByTable = new
HashMap<>();
+ private final Function<String, Set<String>> _consumingSegmentsSupplier;
+ /**
+ * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided
as it can be costly to get
+ * consumingSegmentsByTable via the supplier, so only use it when any
missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegmentsByTable, Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegmentsByTable = consumingSegmentsByTable;
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
- public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ // This might be called by multiple threads, thus synchronized to be correct.
+ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria()
{
+ Set<String> tablesWithMissingSegment = new HashSet<>();
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegmentsByTable.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Will check
consumption status later", tableNameWithType);
+ tablesWithMissingSegment.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ for (String segName : consumingSegments) {
+ if (caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
- continue;
- }
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
+ tablesWithMissingSegment.add(tableNameWithType);
+ continue;
+ }
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to
a committed segment. If that's the case,
+ // segment data manager will not be of type
RealtimeSegmentDataManager.
+ _logger.info("Segment: {} is already committed and is considered
caught up.", segName);
+ caughtUpSegments.add(segName);
+ continue;
+ }
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ caughtUpSegments.add(segName);
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
Review Comment:
(minor) Move the assignment outside and remove the null check in `finally`
``` suggestion
SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segName);
if (segmentDataManager == null) {
_logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
tablesWithMissingSegment.add(tableNameWithType);
continue;
}
try {
if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
// There's a possibility that a consuming segment has converted
to a committed segment. If that's the case,
// segment data manager will not be of type
RealtimeSegmentDataManager.
_logger.info("Segment: {} is already committed and is considered
caught up.", segName);
caughtUpSegments.add(segName);
continue;
}
RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
caughtUpSegments.add(segName);
}
} finally {
tableDataManager.releaseSegment(segmentDataManager);
```
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
package org.apache.pinot.server.starter.helix;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- // constructor parameters
- protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
-
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ private final InstanceDataManager _instanceDataManager;
+ private final Map<String, Set<String>> _consumingSegmentsByTable;
+ private final Map<String, Set<String>> _caughtUpSegmentsByTable = new
HashMap<>();
+ private final Function<String, Set<String>> _consumingSegmentsSupplier;
+ /**
+ * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided
as it can be costly to get
+ * consumingSegmentsByTable via the supplier, so only use it when any
missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegmentsByTable, Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegmentsByTable = consumingSegmentsByTable;
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
- public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ // This might be called by multiple threads, thus synchronized to be correct.
+ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria()
{
+ Set<String> tablesWithMissingSegment = new HashSet<>();
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegmentsByTable.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Will check
consumption status later", tableNameWithType);
+ tablesWithMissingSegment.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ for (String segName : consumingSegments) {
+ if (caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
- continue;
- }
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
+ tablesWithMissingSegment.add(tableNameWithType);
+ continue;
+ }
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to
a committed segment. If that's the case,
+ // segment data manager will not be of type
RealtimeSegmentDataManager.
+ _logger.info("Segment: {} is already committed and is considered
caught up.", segName);
+ caughtUpSegments.add(segName);
+ continue;
+ }
+ RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
+ if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
+ caughtUpSegments.add(segName);
+ }
+ } finally {
+ if (segmentDataManager != null) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
}
- } finally {
- if (segmentDataManager != null) {
- tableDataManager.releaseSegment(segmentDataManager);
+ }
Review Comment:
After the for loop, we can remove the table if all segments are caught up
for a table
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -19,80 +19,107 @@
package org.apache.pinot.server.starter.helix;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import org.apache.pinot.common.utils.LLCSegmentName;
+import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class IngestionBasedConsumptionStatusChecker {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- // constructor parameters
- protected final InstanceDataManager _instanceDataManager;
- protected final Set<String> _consumingSegments;
-
- // helper variable
- private final Set<String> _caughtUpSegments = new HashSet<>();
+ private final InstanceDataManager _instanceDataManager;
+ private final Map<String, Set<String>> _consumingSegmentsByTable;
+ private final Map<String, Set<String>> _caughtUpSegmentsByTable = new
HashMap<>();
+ private final Function<String, Set<String>> _consumingSegmentsSupplier;
+ /**
+ * Both consumingSegmentsByTable and consumingSegmentsSupplier are provided
as it can be costly to get
+ * consumingSegmentsByTable via the supplier, so only use it when any
missing segment is detected.
+ */
public IngestionBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments) {
+ Map<String, Set<String>> consumingSegmentsByTable, Function<String,
Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
- _consumingSegments = consumingSegments;
+ _consumingSegmentsByTable = consumingSegmentsByTable;
+ _consumingSegmentsSupplier = consumingSegmentsSupplier;
}
- public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
- for (String segName : _consumingSegments) {
- if (_caughtUpSegments.contains(segName)) {
- continue;
- }
- TableDataManager tableDataManager = getTableDataManager(segName);
+ // This might be called by multiple threads, thus synchronized to be correct.
+ public synchronized int getNumConsumingSegmentsNotReachedIngestionCriteria()
{
+ Set<String> tablesWithMissingSegment = new HashSet<>();
+ for (Map.Entry<String, Set<String>> tableSegments :
_consumingSegmentsByTable.entrySet()) {
+ String tableNameWithType = tableSegments.getKey();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(tableNameWithType);
if (tableDataManager == null) {
- _logger.info("TableDataManager is not yet setup for segment {}. Will
check consumption status later", segName);
+ _logger.info("No tableDataManager for table: {}. Will check
consumption status later", tableNameWithType);
+ tablesWithMissingSegment.add(tableNameWithType);
continue;
}
- SegmentDataManager segmentDataManager = null;
- try {
- segmentDataManager = tableDataManager.acquireSegment(segName);
- if (segmentDataManager == null) {
- _logger.info("SegmentDataManager is not yet setup for segment {}.
Will check consumption status later",
- segName);
+ Set<String> consumingSegments = tableSegments.getValue();
+ Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ for (String segName : consumingSegments) {
+ if (caughtUpSegments.contains(segName)) {
continue;
}
- if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
- // There's a possibility that a consuming segment has converted to a
committed segment. If that's the case,
- // segment data manager will not be of type
RealtimeSegmentDataManager.
- _logger.info("Segment {} is already committed and is considered
caught up.", segName);
- _caughtUpSegments.add(segName);
- continue;
- }
-
- RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
- _caughtUpSegments.add(segName);
+ SegmentDataManager segmentDataManager = null;
+ try {
+ segmentDataManager = tableDataManager.acquireSegment(segName);
+ if (segmentDataManager == null) {
+ _logger.info("No SegmentDataManager for segment: {}. Will check
consumption status later", segName);
+ tablesWithMissingSegment.add(tableNameWithType);
+ continue;
+ }
+ if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
+ // There's a possibility that a consuming segment has converted to
a committed segment. If that's the case,
+ // segment data manager will not be of type
RealtimeSegmentDataManager.
+ _logger.info("Segment: {} is already committed and is considered
caught up.", segName);
Review Comment:
In this scenario, a new consuming segment should already be created.
Consider re-fetch the consuming segment for the table
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]