maytasm commented on code in PR #19235:
URL: https://github.com/apache/druid/pull/19235#discussion_r3172282826


##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;

Review Comment:
   Actually this calculation is an issue too. The wall-clock extrapolation 
conflates queue wait time with processing time. Under thread pool contention, 
segments sit in the queue, elapsed wall-clock grows, but completed stays low — 
the ratio elapsedMs / completed explodes.



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;
+                        long remainingMs = context.getTimeout() - elapsedMs;
+                        if (extrapolatedMs > context.getTimeout() && 
remainingMs < extrapolatedMs - elapsedMs) {
+                          extrapolationCancelled.set(true);
+                          GuavaUtils.cancelAll(true, future, futures);
+                        }
+                      }
+                    },
+                    Execs.directExecutor()

Review Comment:
   cancel interrupts the task runner threads (processing pool), not threads 
waiting on get(). The Jetty thread receives a CancellationException through the 
normal Future.get() contract — the same mechanism that already fires for 
regular query cancellation.



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;

Review Comment:
   track actual per-segment processing time (inside the callable, before/after 
the input.run() call) rather than wall-clock time from query start. If we do 
this, then as @jtuglu1 mentioned above, we should also account for 
threadPoolSize (aka the parallelism). So maybe something like 
estimated_remaining = avg_segment_time * ceil(remainingSegments / 
threadPoolSize)



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;
+                        long remainingMs = context.getTimeout() - elapsedMs;
+                        if (extrapolatedMs > context.getTimeout() && 
remainingMs < extrapolatedMs - elapsedMs) {

Review Comment:
   Aren't these two conditions equivalent? Why do we need both?



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;
+                        long remainingMs = context.getTimeout() - elapsedMs;
+                        if (extrapolatedMs > context.getTimeout() && 
remainingMs < extrapolatedMs - elapsedMs) {
+                          extrapolationCancelled.set(true);
+                          GuavaUtils.cancelAll(true, future, futures);
+                        }
+                      }
+                    },
+                    Execs.directExecutor()

Review Comment:
   The listeners use Execs.directExecutor(), which means they run on the 
processing pool thread that completes the future, not on the Jetty/main thread. 
The main thread is parked inside future.get() waiting for the allAsList future 
to resolve.



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;

Review Comment:
   elapsedMs * totalSegments can overflow (although unlikely). can rewrite to 
`long extrapolatedMs = elapsedMs / completed * totalSegments;`



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -153,8 +167,32 @@ public Iterable<T> call()
                     )
                 );
 
+            final int totalSegments = futures.size();
             ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
             queryWatcher.registerQueryFuture(query, future);
+            
+            if (completedSegments != null && totalSegments >= samplingWindow 
&& context.hasTimeout()) {
+              for (ListenableFuture<?> f : futures) {
+                f.addListener(
+                    () -> {
+                      if (extrapolationCancelled.get()) {
+                        return;
+                      }
+                      int completed = completedSegments.get();
+                      if (completed >= samplingWindow) {
+                        long elapsedMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanos);
+                        long extrapolatedMs = elapsedMs * totalSegments / 
completed;

Review Comment:
   I think the formula does account for parallelism. The parallelism is already 
baked into elapsedMs / completed since elapsedMs is average wall-clock time per 
completion not avg per segment time.
   Example: 100 threads, 1000 segments, each segment taking 300ms:              
                                                                                
                                                                                
                                                                                
                                                 
     - First 100 segments run in parallel, complete at ~300ms wall-clock        
                                                                                
                                                                                
                                                                                
                                               
     - elapsedMs=300, completed=100, totalSegments=1000
     - Formula: 300 * 1000 / 100 = 3,000ms                                      
                                                                                
                                                                                
                                                                                
                                               
     - Actual: ceil(1000/100) * 300ms = 3,000ms
    



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -85,6 +88,13 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final 
ResponseContext respo
     final QueryContext context = query.context();
     final boolean usePerSegmentTimeout = context.usePerSegmentTimeout();
     final long perSegmentTimeout = context.getPerSegmentTimeout();
+    final int samplingWindow = context.getPerSegmentSamplingWindow();
+    final long queryStartNanos = System.nanoTime();

Review Comment:
   +1 Move queryStartNanos capture inside make(), just before the futures are 
submitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to