sajjad-moradi commented on a change in pull request #7066:
URL: https://github.com/apache/pinot/pull/7066#discussion_r727432780
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -103,8 +107,15 @@ public synchronized void
updateFlushThreshold(PartitionLevelStreamConfig streamC
// less same characteristics at any one point in time).
Review comment:
Could you please update the first comment line to reflect the changes
this PR introduces?
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -102,8 +106,14 @@ public synchronized void
updateFlushThreshold(PartitionLevelStreamConfig streamC
// less same characteristics at any one point in time).
// However, when we start a new table or change controller mastership, we
can have any partition completing first.
// It is best to learn the ratio as quickly as we can, so we allow any
partition to supply the value.
- // FIXME: The stream may not have partition "0"
- if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 ||
_latestSegmentRowsToSizeRatio == 0) {
+
+ // Partition group id 0 might not be available always. We take the
smallest available partition id in that case to update the threshold
+ int smallestAvailablePartitionGroupId =
+
partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId))
+ .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() ->
0);
Review comment:
`updateFlushThreshod` method gets called only in
`PinotLLCRealtimeSegmentManager.createNewSegmentZKMetadata` which is for
creating ZK metadata for new segments. If there's no partition available, it
means there's no new segments and updateFlushThreshold doesn't get called. So,
we can throw illegal state exception for the orElse part.
Also, min calculation can be simplified to:
```java
partitionGroupMetadataList.stream()
.mapToInt(PartitionGroupMetadata::getPartitionGroupId)
.min()
.orElseThrow(() -> new IllegalStateException("..."));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]