Copilot commented on code in PR #16625:
URL: https://github.com/apache/pinot/pull/16625#discussion_r2283553572


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -42,39 +45,75 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+/// TODO: Use CID to manage the queries

Review Comment:
   [nitpick] This TODO comment is unrelated to the changes in this PR. Consider 
removing it or addressing it in a separate PR to maintain focus on the race 
condition fix.
   ```suggestion
   
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -1942,7 +1942,17 @@ public static class PlanVersions {
     /// Max time to keep the op stats in the cache.
     public static final String KEY_OF_OP_STATS_CACHE_EXPIRE_MS = 
"pinot.server.query.op.stats.cache.ms";
     public static final int DEFAULT_OF_OP_STATS_CACHE_EXPIRE_MS = 60 * 1000;
+
+    /// Max number of cancelled queries to keep in the cache.
+    public static final String KEY_OF_CANCELLED_QUERY_CACHE_SIZE = 
"pinot.server.query.cancelled.cache.size";
+    public static final int DEFAULT_OF_CANCELLED_QUERY_CACHE_SIZE = 1000;
+
+    /// Max time to keep the cancelled queries in the cache.
+    public static final String KEY_OF_CANCELLED_QUERY_CACHE_EXPIRE_MS = 
"pinot.server.query.cancelled.cache.ms";
+    public static final int DEFAULT_OF_CANCELLED_QUERY_CACHE_EXPIRE_MS = 60 * 
1000;
+
     /// Timeout of the cancel request, in milliseconds.
+    /// TODO: This is used by the broker. Consider renaming it.

Review Comment:
   [nitpick] This TODO comment is unrelated to the changes in this PR and 
appears to be leftover from previous work. Consider removing it or addressing 
it in a separate PR to keep this change focused.
   ```suggestion
   
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -113,11 +152,20 @@ public void runJob() {
         }
       }
     });
-    _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
-    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
+    _opChainCache.put(opChainId, operatorChain.getRoot());
+    _submittedOpChainMap.put(opChainId, scheduledFuture);
   }
 
   public Map<Integer, MultiStageQueryStats.StageStats.Closed> cancel(long 
requestId) {
+    // Acquire write lock for the query to ensure that the query is not 
cancelled while scheduling the operator chain.
+    Lock writeLock = getQueryLock(requestId).writeLock();
+    writeLock.lock();
+    try {
+      _cancelledQueryCache.put(requestId, requestId);

Review Comment:
   Using the requestId as both key and value in the cache is redundant and 
wastes memory. Consider using a simpler approach like 
`_cancelledQueryCache.put(requestId, Boolean.TRUE)` or a Set-like structure to 
only store the key.
   ```suggestion
         _cancelledQueryCache.put(requestId, Boolean.TRUE);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to