Copilot commented on code in PR #17845:
URL: https://github.com/apache/pinot/pull/17845#discussion_r2908609159
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java:
##########
@@ -302,23 +302,30 @@ private void sendResponse(ChannelHandlerContext ctx, long
requestId, String tabl
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs -
queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f ->
{
- long sendResponseEndTimeMs = System.currentTimeMillis();
- int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
-
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
serializedDataTable.length);
- _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
- sendResponseLatencyMs, TimeUnit.MILLISECONDS);
+ if (f.isSuccess()) {
+ long sendResponseEndTimeMs = System.currentTimeMillis();
+ int sendResponseLatencyMs = (int) (sendResponseEndTimeMs -
sendResponseStartTimeMs);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
1);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT,
serializedDataTable.length);
+ _serverMetrics.addTimedTableValue(tableNameWithType,
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
+ sendResponseLatencyMs, TimeUnit.MILLISECONDS);
- int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
- if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
- LOGGER.info(
- "Slow query ({}): request handler processing time: {}, send
response latency: {}, total time to handle "
- + "request: {}", requestId, queryProcessingTimeMs,
sendResponseLatencyMs, totalQueryTimeMs);
- }
- if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
- LOGGER.warn("Large query ({}): response size in bytes: {}, table name
{}", requestId,
- serializedDataTable.length, tableNameWithType);
- ServerMetrics.get().addMeteredTableValue(tableNameWithType,
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+ int totalQueryTimeMs = (int) (sendResponseEndTimeMs -
queryArrivalTimeMs);
+ if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+ LOGGER.info(
+ "Slow query ({}): request handler processing time: {}, send
response latency: {}, total time to handle "
+ + "request: {}", requestId, queryProcessingTimeMs,
sendResponseLatencyMs, totalQueryTimeMs);
+ }
+ if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
+ LOGGER.warn("Large query ({}): response size in bytes: {}, table
name {}", requestId,
+ serializedDataTable.length, tableNameWithType);
+ _serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+ }
+ } else {
+ Throwable cause = f.cause();
+ LOGGER.error("Failed to send response for request: {} table: {}",
requestId, tableNameWithType, cause);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_SEND_RESPONSE_FAILURES,
1);
+ ctx.close();
Review Comment:
The PR description mentions incrementing
`NETTY_CONNECTION_SEND_RESPONSE_EXCEPTIONS`, but the implementation
adds/increments `NETTY_CONNECTION_SEND_RESPONSE_FAILURES`. Please align the
metric name (either rename the meter for consistency with existing
`*_EXCEPTIONS` meters, or update the PR description and any dashboards/alerts
that expect the old name).
##########
pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java:
##########
@@ -48,6 +56,13 @@
public class InstanceRequestHandlerTest {
+ @BeforeClass
+ public void setUp() {
+ PinotMetricUtils.init(new PinotConfiguration());
+ PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+ ServerMetrics.register(new ServerMetrics(registry));
+ }
Review Comment:
`PinotMetricUtils.init(...)` performs global metrics initialization
(including default registration listeners such as the JMX reporter) and
`ServerMetrics.register(...)` mutates a global singleton. This test doesn’t
assert on metrics, and `InstanceRequestHandler` works with the NOOP
`ServerMetrics` by default, so this setup appears unnecessary and can leak
state across the test suite. Consider removing this setup, or adding an
`@AfterClass` cleanup (e.g., `ServerMetrics.deregister()` and
`PinotMetricUtils.cleanUp()`) to avoid cross-test interference.
##########
pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java:
##########
@@ -88,6 +103,47 @@ public void testCancelQuery()
assertFalse(handler.cancelQuery("unknown"));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWriteFailureClosesChannel()
+ throws Exception {
+ PinotConfiguration config = new PinotConfiguration();
+ CountDownLatch queryFinishLatch = new CountDownLatch(1);
+ QueryScheduler queryScheduler = createQueryScheduler(config,
queryFinishLatch);
+ InstanceRequestHandler handler =
+ new InstanceRequestHandler("server01", config, queryScheduler,
mock(AccessControl.class),
+ ThreadAccountantUtils.getNoOpAccountant());
+
+ ServerQueryRequest query = mock(ServerQueryRequest.class);
+ when(query.getQueryId()).thenReturn("test-query");
+ when(query.getTableNameWithType()).thenReturn("testTable_OFFLINE");
+ when(query.getRequestId()).thenReturn(1L);
+ QueryExecutionContext executionContext = mock(QueryExecutionContext.class);
+ when(executionContext.getCid()).thenReturn("test-query");
+ when(query.toExecutionContext(any())).thenReturn(executionContext);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ ChannelFuture writeFuture = mock(ChannelFuture.class);
+ when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+
+ ArgumentCaptor<GenericFutureListener> listenerCaptor =
ArgumentCaptor.forClass(GenericFutureListener.class);
Review Comment:
In `testWriteFailureClosesChannel`, the test uses a raw
`GenericFutureListener` captor and suppresses unchecked warnings. It would be
safer and clearer to parameterize the captor with the expected Netty listener
type (matching `ChannelFuture.addListener(...)`) so the compiler can enforce
the correct future type and the suppression can be removed.
```suggestion
ArgumentCaptor<GenericFutureListener<Future<Void>>> listenerCaptor =
ArgumentCaptor.forClass((Class<GenericFutureListener<Future<Void>>>)
(Class<?>) GenericFutureListener.class);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]