This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 58daa997dc9 Fix server outbound write failure creating zombie channels 
(#17845)
58daa997dc9 is described below

commit 58daa997dc9b1e6ed05b9843644a5dc6ff8a249a
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Tue Mar 10 00:34:50 2026 -0700

    Fix server outbound write failure creating zombie channels (#17845)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../core/transport/InstanceRequestHandler.java     | 39 ++++++++-------
 .../core/transport/InstanceRequestHandlerTest.java | 56 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 16 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 03d77408b74..800cd0c0054 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -123,6 +123,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),
   NETTY_CONNECTION_RESPONSES_SENT("nettyConnection", true),
   NETTY_CONNECTION_BYTES_SENT("nettyConnection", true),
+  NETTY_CONNECTION_SEND_RESPONSE_FAILURES("nettyConnection", true),
 
   // GRPC related metrics
   GRPC_QUERIES("grpcQueries", true),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index f0e3d728809..98fa80349b7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -302,23 +302,30 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
     long sendResponseStartTimeMs = System.currentTimeMillis();
     int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - 
queryArrivalTimeMs);
     
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> 
{
-      long sendResponseEndTimeMs = System.currentTimeMillis();
-      int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - 
sendResponseStartTimeMs);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
 1);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, 
serializedDataTable.length);
-      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
-          sendResponseLatencyMs, TimeUnit.MILLISECONDS);
+      if (f.isSuccess()) {
+        long sendResponseEndTimeMs = System.currentTimeMillis();
+        int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - 
sendResponseStartTimeMs);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
 1);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, 
serializedDataTable.length);
+        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
+            sendResponseLatencyMs, TimeUnit.MILLISECONDS);
 
-      int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
-      if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-        LOGGER.info(
-            "Slow query ({}): request handler processing time: {}, send 
response latency: {}, total time to handle "
-                + "request: {}", requestId, queryProcessingTimeMs, 
sendResponseLatencyMs, totalQueryTimeMs);
-      }
-      if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
-        LOGGER.warn("Large query ({}): response size in bytes: {}, table name 
{}", requestId,
-            serializedDataTable.length, tableNameWithType);
-        ServerMetrics.get().addMeteredTableValue(tableNameWithType, 
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+        int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
+        if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+          LOGGER.info(
+              "Slow query ({}): request handler processing time: {}, send 
response latency: {}, total time to handle "
+                  + "request: {}", requestId, queryProcessingTimeMs, 
sendResponseLatencyMs, totalQueryTimeMs);
+        }
+        if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
+          LOGGER.warn("Large query ({}): response size in bytes: {}, table 
name {}", requestId,
+              serializedDataTable.length, tableNameWithType);
+          _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+        }
+      } else {
+        Throwable cause = f.cause();
+        LOGGER.error("Failed to send response for request: {} table: {}", 
requestId, tableNameWithType, cause);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_SEND_RESPONSE_FAILURES,
 1);
+        ctx.close();
       }
     });
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
index c2eb2136dea..a9298e3ba95 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java
@@ -22,10 +22,13 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
@@ -34,12 +37,17 @@ import 
org.apache.pinot.core.query.scheduler.resources.UnboundedResourceManager;
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.util.TestUtils;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -48,6 +56,13 @@ import static org.testng.Assert.assertTrue;
 
 public class InstanceRequestHandlerTest {
 
+  @BeforeClass
+  public void setUp() {
+    PinotMetricUtils.init(new PinotConfiguration());
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    ServerMetrics.register(new ServerMetrics(registry));
+  }
+
   @Test
   public void testCancelQuery()
       throws InterruptedException {
@@ -88,6 +103,47 @@ public class InstanceRequestHandlerTest {
     assertFalse(handler.cancelQuery("unknown"));
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWriteFailureClosesChannel()
+      throws Exception {
+    PinotConfiguration config = new PinotConfiguration();
+    CountDownLatch queryFinishLatch = new CountDownLatch(1);
+    QueryScheduler queryScheduler = createQueryScheduler(config, 
queryFinishLatch);
+    InstanceRequestHandler handler =
+        new InstanceRequestHandler("server01", config, queryScheduler, 
mock(AccessControl.class),
+            ThreadAccountantUtils.getNoOpAccountant());
+
+    ServerQueryRequest query = mock(ServerQueryRequest.class);
+    when(query.getQueryId()).thenReturn("test-query");
+    when(query.getTableNameWithType()).thenReturn("testTable_OFFLINE");
+    when(query.getRequestId()).thenReturn(1L);
+    QueryExecutionContext executionContext = mock(QueryExecutionContext.class);
+    when(executionContext.getCid()).thenReturn("test-query");
+    when(query.toExecutionContext(any())).thenReturn(executionContext);
+
+    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+    ChannelFuture writeFuture = mock(ChannelFuture.class);
+    when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+
+    ArgumentCaptor<GenericFutureListener> listenerCaptor = 
ArgumentCaptor.forClass(GenericFutureListener.class);
+    
when(writeFuture.addListener(listenerCaptor.capture())).thenReturn(writeFuture);
+
+    handler.submitQuery(query, ctx, System.currentTimeMillis());
+    queryFinishLatch.countDown();
+
+    TestUtils.waitForCondition((aVoid) -> 
!listenerCaptor.getAllValues().isEmpty(), 10_000L,
+        "Timed out waiting for write listener to be registered");
+
+    Future<Void> failedFuture = mock(Future.class);
+    when(failedFuture.isSuccess()).thenReturn(false);
+    when(failedFuture.cause()).thenReturn(new OutOfMemoryError("Direct buffer 
memory"));
+
+    listenerCaptor.getValue().operationComplete(failedFuture);
+
+    verify(ctx).close();
+  }
+
   private QueryScheduler createQueryScheduler(PinotConfiguration config, 
CountDownLatch queryFinishLatch) {
     ResourceManager resourceManager = new UnboundedResourceManager(config);
     return new QueryScheduler(config, "serverId", mock(QueryExecutor.class), 
ThreadAccountantUtils.getNoOpAccountant(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to