This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 111cbf70a05 Fix broker write failure handling in ServerChannels 
(#17861)
111cbf70a05 is described below

commit 111cbf70a05d8cdeea9029714fc13d3375b2ed3d
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Mar 11 16:20:24 2026 -0700

    Fix broker write failure handling in ServerChannels (#17861)
    
    ServerChannels.sendRequestWithoutLocking() never checked writeAndFlush()
    success, causing silent failures: markRequestSent() called on failed
    writes (false positive), no channel close (zombie channels), no metrics,
    and queries waiting for full timeout instead of failing fast.
    
    Add f.isSuccess() check in the writeAndFlush listener:
    - On success: existing behavior (latency timer + markRequestSent)
    - On failure: log error, increment NETTY_CONNECTION_SEND_REQUEST_FAILURES
      metric, call markServerDown() for fast query failure, close channel
    
    Uses markServerDown() over markQueryFailed() because it is race-safe
    (no-op if server already responded) and the channel close also triggers
    channelInactive() for all in-flight queries. Wraps f.cause() in
    RuntimeException since markServerDown expects Exception but cause() can
    return Throwable (e.g. OutOfMemoryError).
---
 .../apache/pinot/common/metrics/BrokerMeter.java   |   2 +
 .../pinot/core/transport/ServerChannels.java       |  27 ++++-
 .../pinot/core/transport/ServerChannelsTest.java   | 135 +++++++++++++++++----
 3 files changed, 134 insertions(+), 30 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index e1d4778d12b..3970452d1e4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -207,6 +207,8 @@ public class BrokerMeter implements AbstractMetrics.Meter {
       "NETTY_CONNECTION_BYTES_SENT", "nettyConnection", true);
   public static final BrokerMeter NETTY_CONNECTION_BYTES_RECEIVED = create(
       "NETTY_CONNECTION_BYTES_RECEIVED", "nettyConnection", true);
+  public static final BrokerMeter NETTY_CONNECTION_SEND_REQUEST_FAILURES = 
create(
+      "NETTY_CONNECTION_SEND_REQUEST_FAILURES", "nettyConnection", true);
 
   public static final BrokerMeter PROACTIVE_CLUSTER_CHANGE_CHECK = create(
       "PROACTIVE_CLUSTER_CHANGE_CHECK", "proactiveClusterChangeCheck", true);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index fae61d0567f..7f0b091c32c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.transport;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocatorMetric;
@@ -149,6 +150,11 @@ public class ServerChannels {
     _eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
   }
 
+  @VisibleForTesting
+  ServerChannel getOrCreateServerChannel(ServerRoutingInstance instance) {
+    return _serverToChannelMap.computeIfAbsent(instance, ServerChannel::new);
+  }
+
   @ThreadSafe
   class ServerChannel {
     final ServerRoutingInstance _serverRoutingInstance;
@@ -204,6 +210,11 @@ public class ServerChannels {
       }
     }
 
+    @VisibleForTesting
+    void setChannel(Channel channel) {
+      _channel = channel;
+    }
+
     void setSilentShutdown() {
       if (_channel != null) {
         DirectOOMHandler directOOMHandler = 
_channel.pipeline().get(DirectOOMHandler.class);
@@ -242,10 +253,18 @@ public class ServerChannels {
         ServerRoutingInstance serverRoutingInstance, byte[] requestBytes) {
       long startTimeMs = System.currentTimeMillis();
       
_channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> {
-        int requestSentLatencyMs = (int) (System.currentTimeMillis() - 
startTimeMs);
-        _brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
-            requestSentLatencyMs, TimeUnit.MILLISECONDS);
-        asyncQueryResponse.markRequestSent(serverRoutingInstance, 
requestSentLatencyMs);
+        if (f.isSuccess()) {
+          int requestSentLatencyMs = (int) (System.currentTimeMillis() - 
startTimeMs);
+          _brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
+              requestSentLatencyMs, TimeUnit.MILLISECONDS);
+          asyncQueryResponse.markRequestSent(serverRoutingInstance, 
requestSentLatencyMs);
+        } else {
+          LOGGER.error("Write failure to server: {} for table: {}", 
serverRoutingInstance, rawTableName, f.cause());
+          
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_SEND_REQUEST_FAILURES,
 1);
+          asyncQueryResponse.markServerDown(serverRoutingInstance,
+              new RuntimeException("Failed to send request to server: " + 
serverRoutingInstance, f.cause()));
+          _channel.close();
+        }
       });
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT,
 1);
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT, 
requestBytes.length);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
index 23e45c1273f..8d5aabbfc15 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
@@ -19,22 +19,32 @@
 package org.apache.pinot.core.transport;
 
 import com.sun.net.httpserver.HttpServer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
 import org.apache.pinot.common.config.NettyConfig;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
 import org.apache.pinot.spi.config.table.TableType;
-import org.testng.annotations.AfterClass;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.mockito.ArgumentCaptor;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class ServerChannelsTest {
-  private static HttpServer _dummyServer;
 
   @DataProvider
   public Object[][] parameters() {
@@ -44,42 +54,115 @@ public class ServerChannelsTest {
   }
 
   @BeforeClass
-  public void setUp()
-      throws Exception {
-    _dummyServer = HttpServer.create();
-    _dummyServer.bind(new InetSocketAddress("localhost", 0), 0);
-    _dummyServer.start();
+  public void setUp() {
+    PinotMetricUtils.init(new PinotConfiguration());
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    BrokerMetrics.register(new BrokerMetrics(registry));
   }
 
-  @AfterClass
-  public void tearDown()
+  @Test(dataProvider = "parameters")
+  public void testConnect(boolean nativeTransportEnabled)
       throws Exception {
-    if (_dummyServer != null) {
-      _dummyServer.stop(0);
+    HttpServer dummyServer = HttpServer.create();
+    dummyServer.bind(new InetSocketAddress("localhost", 0), 0);
+    dummyServer.start();
+    try {
+      NettyConfig nettyConfig = new NettyConfig();
+      nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+      QueryRouter queryRouter = mock(QueryRouter.class);
+
+      ServerRoutingInstance serverRoutingInstance =
+          new ServerRoutingInstance("localhost", 
dummyServer.getAddress().getPort(), TableType.REALTIME);
+      ServerChannels serverChannels =
+          new ServerChannels(queryRouter, nettyConfig, null, 
ThreadAccountantUtils.getNoOpAccountant());
+      serverChannels.connect(serverRoutingInstance);
+
+      final long requestId = System.currentTimeMillis();
+
+      AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
+      BrokerRequest brokerRequest = new BrokerRequest();
+      InstanceRequest instanceRequest = new InstanceRequest();
+      instanceRequest.setRequestId(requestId);
+      instanceRequest.setQuery(brokerRequest);
+      serverChannels.sendRequest("dummy_table_name", asyncQueryResponse, 
serverRoutingInstance, instanceRequest, 1000);
+      serverChannels.shutDown();
+    } finally {
+      dummyServer.stop(0);
     }
   }
 
-  @Test(dataProvider = "parameters")
-  public void testConnect(boolean nativeTransportEnabled)
-      throws Exception {
-    NettyConfig nettyConfig = new NettyConfig();
-    nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWriteFailureClosesChannelAndFailsQuery() {
     QueryRouter queryRouter = mock(QueryRouter.class);
+    ServerChannels serverChannels =
+        new ServerChannels(queryRouter, null, null, 
ThreadAccountantUtils.getNoOpAccountant());
+
+    ServerRoutingInstance routingInstance = new 
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE);
+    ServerChannels.ServerChannel serverChannel = 
serverChannels.getOrCreateServerChannel(routingInstance);
+
+    Channel mockChannel = mock(Channel.class);
+    ChannelFuture mockFuture = mock(ChannelFuture.class);
+    when(mockChannel.writeAndFlush(any())).thenReturn(mockFuture);
+    serverChannel.setChannel(mockChannel);
+
+    ArgumentCaptor<GenericFutureListener> listenerCaptor = 
ArgumentCaptor.forClass(GenericFutureListener.class);
+    
when(mockFuture.addListener(listenerCaptor.capture())).thenReturn(mockFuture);
+
+    AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
+    serverChannel.sendRequestWithoutLocking("test_table", asyncQueryResponse, 
routingInstance, new byte[]{1, 2, 3});
 
-    ServerRoutingInstance serverRoutingInstance =
-        new ServerRoutingInstance("localhost", 
_dummyServer.getAddress().getPort(), TableType.REALTIME);
+    // Simulate write failure
+    when(mockFuture.isSuccess()).thenReturn(false);
+    when(mockFuture.cause()).thenReturn(new OutOfMemoryError("Direct buffer 
memory"));
+
+    try {
+      listenerCaptor.getValue().operationComplete(mockFuture);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    verify(mockChannel).close();
+    
verify(asyncQueryResponse).markServerDown(any(ServerRoutingInstance.class), 
any(Exception.class));
+    verify(asyncQueryResponse, 
never()).markRequestSent(any(ServerRoutingInstance.class), any(Integer.class));
+
+    serverChannels.shutDown();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWriteSuccessMarksRequestSent() {
+    QueryRouter queryRouter = mock(QueryRouter.class);
     ServerChannels serverChannels =
-        new ServerChannels(queryRouter, nettyConfig, null, 
ThreadAccountantUtils.getNoOpAccountant());
-    serverChannels.connect(serverRoutingInstance);
+        new ServerChannels(queryRouter, null, null, 
ThreadAccountantUtils.getNoOpAccountant());
 
-    final long requestId = System.currentTimeMillis();
+    ServerRoutingInstance routingInstance = new 
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE);
+    ServerChannels.ServerChannel serverChannel = 
serverChannels.getOrCreateServerChannel(routingInstance);
+
+    Channel mockChannel = mock(Channel.class);
+    ChannelFuture mockFuture = mock(ChannelFuture.class);
+    when(mockChannel.writeAndFlush(any())).thenReturn(mockFuture);
+    serverChannel.setChannel(mockChannel);
+
+    ArgumentCaptor<GenericFutureListener> listenerCaptor = 
ArgumentCaptor.forClass(GenericFutureListener.class);
+    
when(mockFuture.addListener(listenerCaptor.capture())).thenReturn(mockFuture);
 
     AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
-    BrokerRequest brokerRequest = new BrokerRequest();
-    InstanceRequest instanceRequest = new InstanceRequest();
-    instanceRequest.setRequestId(requestId);
-    instanceRequest.setQuery(brokerRequest);
-    serverChannels.sendRequest("dummy_table_name", asyncQueryResponse, 
serverRoutingInstance, instanceRequest, 1000);
+    serverChannel.sendRequestWithoutLocking("test_table", asyncQueryResponse, 
routingInstance, new byte[]{1, 2, 3});
+
+    // Simulate write success
+    when(mockFuture.isSuccess()).thenReturn(true);
+
+    try {
+      listenerCaptor.getValue().operationComplete(mockFuture);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    
verify(asyncQueryResponse).markRequestSent(any(ServerRoutingInstance.class), 
any(Integer.class));
+    verify(asyncQueryResponse, 
never()).markServerDown(any(ServerRoutingInstance.class), any(Exception.class));
+    verify(mockChannel, never()).close();
+
     serverChannels.shutDown();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to