This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a2557515d92 Use Broker's accountant to sample in the request handler. 
(#16439)
a2557515d92 is described below

commit a2557515d92a91257ae165504ffdfabeb981da06
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Mon Jul 28 23:10:58 2025 +0530

    Use Broker's accountant to sample in the request handler. (#16439)
---
 .../requesthandler/BaseSingleStageBrokerRequestHandler.java       | 8 ++++----
 pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java   | 7 +++++++
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 78b8d4b4040..8ef923f63bb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -332,7 +332,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       return doHandleRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext,
           httpHeaders, accessControl);
     } finally {
-      Tracing.ThreadAccountantOps.clear();
+      _resourceUsageAccountant.clear();
     }
   }
 
@@ -401,7 +401,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
         (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
     // Accounts for resource usage of the compilation phase, since compilation 
for some queries can be expensive.
-    Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
+    
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);
 
     // Second-stage table-level access control
     // TODO: Modify AccessControl interface to directly take PinotQuery
@@ -442,7 +442,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.AUTHORIZATION,
           System.nanoTime() - compilationEndTimeNs);
       // Accounts for resource usage of the authorization phase.
-      Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
+      
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);
 
       if (!authorizationResult.hasAccess()) {
         throwAccessDeniedError(requestId, query, requestContext, tableName, 
authorizationResult);
@@ -693,7 +693,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
         routingEndTimeNs - routingStartTimeNs);
     // Account the resource used for routing phase, since for single stage 
queries with multiple segments, routing
     // can be expensive.
-    Tracing.ThreadAccountantOps.sampleAndCheckInterruption();
+    
Tracing.ThreadAccountantOps.sampleAndCheckInterruption(_resourceUsageAccountant);
 
     // Set timeout in the requests
     long timeSpentMs = TimeUnit.NANOSECONDS.toMillis(routingEndTimeNs - 
compilationStartTimeNs);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 831e6d08b6b..0f2ee9deb72 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -391,6 +391,13 @@ public class Tracing {
       sample();
     }
 
+    public static void 
sampleAndCheckInterruption(ThreadResourceUsageAccountant accountant) {
+      if (Thread.interrupted() || accountant.isAnchorThreadInterrupted() || 
accountant.isQueryTerminated()) {
+        throw new EarlyTerminationException("Interrupted while merging 
records");
+      }
+      accountant.sampleUsage();
+    }
+
     @Deprecated
     public static void updateQueryUsageConcurrently(String queryId) {
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to