FrankChen021 commented on code in PR #19607:
URL: https://github.com/apache/druid/pull/19607#discussion_r3452102442


##########
server/src/main/java/org/apache/druid/client/DirectDruidClient.java:
##########
@@ -310,7 +325,33 @@ public InputStream nextElement()
                       }
                     }
                   }
-              ),
+              )
+              {
+                /**
+                 * Closing this stream means the caller no longer needs the 
response. The default
+                 * {@link SequenceInputStream#close()} would drain the entire 
remaining response off the wire first
+                 * we want to avoid. Instead, abandon the response and 
force-close the connection
+                 */
+                @Override
+                public void close()
+                {
+                  final TrafficCop trafficCop;
+                  synchronized (done) {
+                    if (done.get()) {

Review Comment:
   [P2] Early close still needs to discard completed buffered response
   
   When the server has already finished sending the response, `done` is true, 
so this new `close()` override returns before setting `discard`, clearing 
`queue`, or delegating to `SequenceInputStream.close()`. A caller that stops 
after the first rows while a fast historical has already filled the remaining 
queue will now retain all queued `InputStreamHolder`/`ChannelBuffer` chunks 
until the yielder and parser are GC'd; before this change, 
`SequenceInputStream.close()` drained and removed the queued elements once 
`done` was true. The close path should still clear/discard queued buffers for 
`done == true`; only the connection abort needs to be skipped once the response 
is complete.



##########
processing/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java:
##########
@@ -98,5 +98,13 @@ interface TrafficCop
      * @return time that backpressure was applied (channel was closed for 
reads)
      */
     long resume(long chunkNum);
+
+    /**
+     * Closes the underlying connection, abandoning any remaining response.
+     *
+     * Intended for callers that decide they no longer need the rest of the 
response (for example, because the
+     * consumer of the resulting stream has been closed early)
+     */
+    void abort();

Review Comment:
   [P2] Keep TrafficCop source and binary compatible
   
   Adding `abort()` as a second abstract method makes `TrafficCop` no longer a 
functional interface and forces every out-of-tree mock or custom `HttpClient` 
implementation to add the method, with existing compiled implementations at 
risk of `AbstractMethodError` if this hook is invoked. Since only the Netty 
implementation needs special behavior, make `abort()` a default no-op and let 
Netty override it; that preserves existing lambda/test implementations while 
still enabling this PR's connection-abort path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to