This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2886e67434 adjust the llc partition consuming metric reporting logic 
(#12627)
2886e67434 is described below

commit 2886e67434d036f8c920a75f1d76d646ce2d2cdc
Author: Haitao Zhang <[email protected]>
AuthorDate: Mon Mar 11 17:12:54 2024 -0700

    adjust the llc partition consuming metric reporting logic (#12627)
---
 .../pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java  | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 49cddb5574..9f9a457683 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -421,6 +421,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
     _segmentLogger.info("Starting consumption loop start offset {}, 
finalOffset {}", _currentOffset, _finalOffset);
     while (!_shouldStop && !endCriteriaReached()) {
+      _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
       // Consume for the next readTime ms, or we get to final offset, 
whichever happens earlier,
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
@@ -472,7 +473,6 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
               ((LongMsgOffset) _currentOffset).getOffset());
         }
-        _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);
       } else if (endCriteriaReached) {
         // At this point current offset has not moved because 
processStreamEvents() has exited before processing a
@@ -497,9 +497,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         long timeSinceStreamLastCreatedOrConsumedMs = 
_idleTimer.getTimeSinceStreamLastCreatedOrConsumedMs();
 
         if (idleTimeoutMillis >= 0 && (timeSinceStreamLastCreatedOrConsumedMs 
> idleTimeoutMillis)) {
-          // Update the partition-consuming metric only if we have been idling 
beyond idle timeout.
           // Create a new stream consumer wrapper, in case we are stuck on 
something.
-          _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
           recreateStreamConsumer(
               String.format("Total idle time: %d ms exceeded idle timeout: %d 
ms",
                   timeSinceStreamLastCreatedOrConsumedMs, idleTimeoutMillis));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to