This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 111cbf70a05 Fix broker write failure handling in ServerChannels
(#17861)
111cbf70a05 is described below
commit 111cbf70a05d8cdeea9029714fc13d3375b2ed3d
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Mar 11 16:20:24 2026 -0700
Fix broker write failure handling in ServerChannels (#17861)
ServerChannels.sendRequestWithoutLocking() never checked writeAndFlush()
success, causing silent failures: markRequestSent() called on failed
writes (false positive), no channel close (zombie channels), no metrics,
and queries waiting for full timeout instead of failing fast.
Add f.isSuccess() check in the writeAndFlush listener:
- On success: existing behavior (latency timer + markRequestSent)
- On failure: log error, increment NETTY_CONNECTION_SEND_REQUEST_FAILURES
metric, call markServerDown() for fast query failure, close channel
Uses markServerDown() over markQueryFailed() because it is race-safe
(no-op if server already responded) and the channel close also triggers
channelInactive() for all in-flight queries. Wraps f.cause() in
RuntimeException since markServerDown expects Exception but cause() can
return Throwable (e.g. OutOfMemoryError).
---
.../apache/pinot/common/metrics/BrokerMeter.java | 2 +
.../pinot/core/transport/ServerChannels.java | 27 ++++-
.../pinot/core/transport/ServerChannelsTest.java | 135 +++++++++++++++++----
3 files changed, 134 insertions(+), 30 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index e1d4778d12b..3970452d1e4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -207,6 +207,8 @@ public class BrokerMeter implements AbstractMetrics.Meter {
"NETTY_CONNECTION_BYTES_SENT", "nettyConnection", true);
public static final BrokerMeter NETTY_CONNECTION_BYTES_RECEIVED = create(
"NETTY_CONNECTION_BYTES_RECEIVED", "nettyConnection", true);
+ public static final BrokerMeter NETTY_CONNECTION_SEND_REQUEST_FAILURES =
create(
+ "NETTY_CONNECTION_SEND_REQUEST_FAILURES", "nettyConnection", true);
public static final BrokerMeter PROACTIVE_CLUSTER_CHANGE_CHECK = create(
"PROACTIVE_CLUSTER_CHANGE_CHECK", "proactiveClusterChangeCheck", true);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index fae61d0567f..7f0b091c32c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.transport;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
@@ -149,6 +150,11 @@ public class ServerChannels {
_eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
+ @VisibleForTesting
+ ServerChannel getOrCreateServerChannel(ServerRoutingInstance instance) {
+ return _serverToChannelMap.computeIfAbsent(instance, ServerChannel::new);
+ }
+
@ThreadSafe
class ServerChannel {
final ServerRoutingInstance _serverRoutingInstance;
@@ -204,6 +210,11 @@ public class ServerChannels {
}
}
+ @VisibleForTesting
+ void setChannel(Channel channel) {
+ _channel = channel;
+ }
+
void setSilentShutdown() {
if (_channel != null) {
DirectOOMHandler directOOMHandler =
_channel.pipeline().get(DirectOOMHandler.class);
@@ -242,10 +253,18 @@ public class ServerChannels {
ServerRoutingInstance serverRoutingInstance, byte[] requestBytes) {
long startTimeMs = System.currentTimeMillis();
_channel.writeAndFlush(Unpooled.wrappedBuffer(requestBytes)).addListener(f -> {
- int requestSentLatencyMs = (int) (System.currentTimeMillis() -
startTimeMs);
- _brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
- requestSentLatencyMs, TimeUnit.MILLISECONDS);
- asyncQueryResponse.markRequestSent(serverRoutingInstance,
requestSentLatencyMs);
+ if (f.isSuccess()) {
+ int requestSentLatencyMs = (int) (System.currentTimeMillis() -
startTimeMs);
+ _brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.NETTY_CONNECTION_SEND_REQUEST_LATENCY,
+ requestSentLatencyMs, TimeUnit.MILLISECONDS);
+ asyncQueryResponse.markRequestSent(serverRoutingInstance,
requestSentLatencyMs);
+ } else {
+ LOGGER.error("Write failure to server: {} for table: {}",
serverRoutingInstance, rawTableName, f.cause());
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_SEND_REQUEST_FAILURES,
1);
+ asyncQueryResponse.markServerDown(serverRoutingInstance,
+ new RuntimeException("Failed to send request to server: " +
serverRoutingInstance, f.cause()));
+ _channel.close();
+ }
});
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_REQUESTS_SENT,
1);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.NETTY_CONNECTION_BYTES_SENT,
requestBytes.length);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
index 23e45c1273f..8d5aabbfc15 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
@@ -19,22 +19,32 @@
package org.apache.pinot.core.transport;
import com.sun.net.httpserver.HttpServer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import org.apache.pinot.common.config.NettyConfig;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
import org.apache.pinot.spi.config.table.TableType;
-import org.testng.annotations.AfterClass;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.mockito.ArgumentCaptor;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ServerChannelsTest {
- private static HttpServer _dummyServer;
@DataProvider
public Object[][] parameters() {
@@ -44,42 +54,115 @@ public class ServerChannelsTest {
}
@BeforeClass
- public void setUp()
- throws Exception {
- _dummyServer = HttpServer.create();
- _dummyServer.bind(new InetSocketAddress("localhost", 0), 0);
- _dummyServer.start();
+ public void setUp() {
+ PinotMetricUtils.init(new PinotConfiguration());
+ PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+ BrokerMetrics.register(new BrokerMetrics(registry));
}
- @AfterClass
- public void tearDown()
+ @Test(dataProvider = "parameters")
+ public void testConnect(boolean nativeTransportEnabled)
throws Exception {
- if (_dummyServer != null) {
- _dummyServer.stop(0);
+ HttpServer dummyServer = HttpServer.create();
+ dummyServer.bind(new InetSocketAddress("localhost", 0), 0);
+ dummyServer.start();
+ try {
+ NettyConfig nettyConfig = new NettyConfig();
+ nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+ QueryRouter queryRouter = mock(QueryRouter.class);
+
+ ServerRoutingInstance serverRoutingInstance =
+ new ServerRoutingInstance("localhost",
dummyServer.getAddress().getPort(), TableType.REALTIME);
+ ServerChannels serverChannels =
+ new ServerChannels(queryRouter, nettyConfig, null,
ThreadAccountantUtils.getNoOpAccountant());
+ serverChannels.connect(serverRoutingInstance);
+
+ final long requestId = System.currentTimeMillis();
+
+ AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
+ BrokerRequest brokerRequest = new BrokerRequest();
+ InstanceRequest instanceRequest = new InstanceRequest();
+ instanceRequest.setRequestId(requestId);
+ instanceRequest.setQuery(brokerRequest);
+ serverChannels.sendRequest("dummy_table_name", asyncQueryResponse,
serverRoutingInstance, instanceRequest, 1000);
+ serverChannels.shutDown();
+ } finally {
+ dummyServer.stop(0);
}
}
- @Test(dataProvider = "parameters")
- public void testConnect(boolean nativeTransportEnabled)
- throws Exception {
- NettyConfig nettyConfig = new NettyConfig();
- nettyConfig.setNativeTransportsEnabled(nativeTransportEnabled);
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWriteFailureClosesChannelAndFailsQuery() {
QueryRouter queryRouter = mock(QueryRouter.class);
+ ServerChannels serverChannels =
+ new ServerChannels(queryRouter, null, null,
ThreadAccountantUtils.getNoOpAccountant());
+
+ ServerRoutingInstance routingInstance = new
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE);
+ ServerChannels.ServerChannel serverChannel =
serverChannels.getOrCreateServerChannel(routingInstance);
+
+ Channel mockChannel = mock(Channel.class);
+ ChannelFuture mockFuture = mock(ChannelFuture.class);
+ when(mockChannel.writeAndFlush(any())).thenReturn(mockFuture);
+ serverChannel.setChannel(mockChannel);
+
+ ArgumentCaptor<GenericFutureListener> listenerCaptor =
ArgumentCaptor.forClass(GenericFutureListener.class);
+
when(mockFuture.addListener(listenerCaptor.capture())).thenReturn(mockFuture);
+
+ AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
+ serverChannel.sendRequestWithoutLocking("test_table", asyncQueryResponse,
routingInstance, new byte[]{1, 2, 3});
- ServerRoutingInstance serverRoutingInstance =
- new ServerRoutingInstance("localhost",
_dummyServer.getAddress().getPort(), TableType.REALTIME);
+ // Simulate write failure
+ when(mockFuture.isSuccess()).thenReturn(false);
+ when(mockFuture.cause()).thenReturn(new OutOfMemoryError("Direct buffer
memory"));
+
+ try {
+ listenerCaptor.getValue().operationComplete(mockFuture);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ verify(mockChannel).close();
+
verify(asyncQueryResponse).markServerDown(any(ServerRoutingInstance.class),
any(Exception.class));
+ verify(asyncQueryResponse,
never()).markRequestSent(any(ServerRoutingInstance.class), any(Integer.class));
+
+ serverChannels.shutDown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWriteSuccessMarksRequestSent() {
+ QueryRouter queryRouter = mock(QueryRouter.class);
ServerChannels serverChannels =
- new ServerChannels(queryRouter, nettyConfig, null,
ThreadAccountantUtils.getNoOpAccountant());
- serverChannels.connect(serverRoutingInstance);
+ new ServerChannels(queryRouter, null, null,
ThreadAccountantUtils.getNoOpAccountant());
- final long requestId = System.currentTimeMillis();
+ ServerRoutingInstance routingInstance = new
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE);
+ ServerChannels.ServerChannel serverChannel =
serverChannels.getOrCreateServerChannel(routingInstance);
+
+ Channel mockChannel = mock(Channel.class);
+ ChannelFuture mockFuture = mock(ChannelFuture.class);
+ when(mockChannel.writeAndFlush(any())).thenReturn(mockFuture);
+ serverChannel.setChannel(mockChannel);
+
+ ArgumentCaptor<GenericFutureListener> listenerCaptor =
ArgumentCaptor.forClass(GenericFutureListener.class);
+
when(mockFuture.addListener(listenerCaptor.capture())).thenReturn(mockFuture);
AsyncQueryResponse asyncQueryResponse = mock(AsyncQueryResponse.class);
- BrokerRequest brokerRequest = new BrokerRequest();
- InstanceRequest instanceRequest = new InstanceRequest();
- instanceRequest.setRequestId(requestId);
- instanceRequest.setQuery(brokerRequest);
- serverChannels.sendRequest("dummy_table_name", asyncQueryResponse,
serverRoutingInstance, instanceRequest, 1000);
+ serverChannel.sendRequestWithoutLocking("test_table", asyncQueryResponse,
routingInstance, new byte[]{1, 2, 3});
+
+ // Simulate write success
+ when(mockFuture.isSuccess()).thenReturn(true);
+
+ try {
+ listenerCaptor.getValue().operationComplete(mockFuture);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+
verify(asyncQueryResponse).markRequestSent(any(ServerRoutingInstance.class),
any(Integer.class));
+ verify(asyncQueryResponse,
never()).markServerDown(any(ServerRoutingInstance.class), any(Exception.class));
+ verify(mockChannel, never()).close();
+
serverChannels.shutDown();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]