sajjad-moradi commented on a change in pull request #7066:
URL: https://github.com/apache/pinot/pull/7066#discussion_r727432780



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -103,8 +107,15 @@ public synchronized void 
updateFlushThreshold(PartitionLevelStreamConfig streamC
     // less same characteristics at any one point in time).

Review comment:
       Could you please update the first comment line to reflect the changes 
this PR introduces?

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -102,8 +106,14 @@ public synchronized void 
updateFlushThreshold(PartitionLevelStreamConfig streamC
     // less same characteristics at any one point in time).
     // However, when we start a new table or change controller mastership, we 
can have any partition completing first.
     // It is best to learn the ratio as quickly as we can, so we allow any 
partition to supply the value.
-    // FIXME: The stream may not have partition "0"
-    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || 
_latestSegmentRowsToSizeRatio == 0) {
+
+    // Partition group id 0 might not be available always. We take the 
smallest available partition id in that case to update the threshold
+    int smallestAvailablePartitionGroupId =
+        
partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId))
+            .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() -> 
0);

Review comment:
       `updateFlushThreshod` method gets called only in 
`PinotLLCRealtimeSegmentManager.createNewSegmentZKMetadata` which is for 
creating ZK metadata for new segments. If there's no partition available, it 
means there's no new segments and updateFlushThreshold doesn't get called. So, 
we can throw illegal state exception for the orElse part.
   Also, min calculation can be simplified to:
   ```java
   partitionGroupMetadataList.stream()
     .mapToInt(PartitionGroupMetadata::getPartitionGroupId)
     .min()
     .orElseThrow(() -> new IllegalStateException("..."));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to