Copilot commented on code in PR #17845:
URL: https://github.com/apache/pinot/pull/17845#discussion_r2908609159


##########
pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java:
##########
@@ -302,23 +302,30 @@ private void sendResponse(ChannelHandlerContext ctx, long 
requestId, String tabl
     long sendResponseStartTimeMs = System.currentTimeMillis();
     int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - 
queryArrivalTimeMs);
     
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> 
{
-      long sendResponseEndTimeMs = System.currentTimeMillis();
-      int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - 
sendResponseStartTimeMs);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
 1);
-      
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, 
serializedDataTable.length);
-      _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
-          sendResponseLatencyMs, TimeUnit.MILLISECONDS);
+      if (f.isSuccess()) {
+        long sendResponseEndTimeMs = System.currentTimeMillis();
+        int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - 
sendResponseStartTimeMs);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT,
 1);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, 
serializedDataTable.length);
+        _serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY,
+            sendResponseLatencyMs, TimeUnit.MILLISECONDS);
 
-      int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
-      if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
-        LOGGER.info(
-            "Slow query ({}): request handler processing time: {}, send 
response latency: {}, total time to handle "
-                + "request: {}", requestId, queryProcessingTimeMs, 
sendResponseLatencyMs, totalQueryTimeMs);
-      }
-      if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
-        LOGGER.warn("Large query ({}): response size in bytes: {}, table name 
{}", requestId,
-            serializedDataTable.length, tableNameWithType);
-        ServerMetrics.get().addMeteredTableValue(tableNameWithType, 
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+        int totalQueryTimeMs = (int) (sendResponseEndTimeMs - 
queryArrivalTimeMs);
+        if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
+          LOGGER.info(
+              "Slow query ({}): request handler processing time: {}, send 
response latency: {}, total time to handle "
+                  + "request: {}", requestId, queryProcessingTimeMs, 
sendResponseLatencyMs, totalQueryTimeMs);
+        }
+        if (serializedDataTable.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
+          LOGGER.warn("Large query ({}): response size in bytes: {}, table 
name {}", requestId,
+              serializedDataTable.length, tableNameWithType);
+          _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1);
+        }
+      } else {
+        Throwable cause = f.cause();
+        LOGGER.error("Failed to send response for request: {} table: {}", 
requestId, tableNameWithType, cause);
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_SEND_RESPONSE_FAILURES,
 1);
+        ctx.close();

Review Comment:
   The PR description mentions incrementing 
`NETTY_CONNECTION_SEND_RESPONSE_EXCEPTIONS`, but the implementation 
adds/increments `NETTY_CONNECTION_SEND_RESPONSE_FAILURES`. Please align the 
metric name (either rename the meter for consistency with existing 
`*_EXCEPTIONS` meters, or update the PR description and any dashboards/alerts 
that expect the old name).



##########
pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java:
##########
@@ -48,6 +56,13 @@
 
 public class InstanceRequestHandlerTest {
 
+  @BeforeClass
+  public void setUp() {
+    PinotMetricUtils.init(new PinotConfiguration());
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    ServerMetrics.register(new ServerMetrics(registry));
+  }

Review Comment:
   `PinotMetricUtils.init(...)` performs global metrics initialization 
(including default registration listeners such as the JMX reporter) and 
`ServerMetrics.register(...)` mutates a global singleton. This test doesn’t 
assert on metrics, and `InstanceRequestHandler` works with the NOOP 
`ServerMetrics` by default, so this setup appears unnecessary and can leak 
state across the test suite. Consider removing this setup, or adding an 
`@AfterClass` cleanup (e.g., `ServerMetrics.deregister()` and 
`PinotMetricUtils.cleanUp()`) to avoid cross-test interference.



##########
pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java:
##########
@@ -88,6 +103,47 @@ public void testCancelQuery()
     assertFalse(handler.cancelQuery("unknown"));
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWriteFailureClosesChannel()
+      throws Exception {
+    PinotConfiguration config = new PinotConfiguration();
+    CountDownLatch queryFinishLatch = new CountDownLatch(1);
+    QueryScheduler queryScheduler = createQueryScheduler(config, 
queryFinishLatch);
+    InstanceRequestHandler handler =
+        new InstanceRequestHandler("server01", config, queryScheduler, 
mock(AccessControl.class),
+            ThreadAccountantUtils.getNoOpAccountant());
+
+    ServerQueryRequest query = mock(ServerQueryRequest.class);
+    when(query.getQueryId()).thenReturn("test-query");
+    when(query.getTableNameWithType()).thenReturn("testTable_OFFLINE");
+    when(query.getRequestId()).thenReturn(1L);
+    QueryExecutionContext executionContext = mock(QueryExecutionContext.class);
+    when(executionContext.getCid()).thenReturn("test-query");
+    when(query.toExecutionContext(any())).thenReturn(executionContext);
+
+    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+    ChannelFuture writeFuture = mock(ChannelFuture.class);
+    when(ctx.writeAndFlush(any())).thenReturn(writeFuture);
+
+    ArgumentCaptor<GenericFutureListener> listenerCaptor = 
ArgumentCaptor.forClass(GenericFutureListener.class);

Review Comment:
   In `testWriteFailureClosesChannel`, the test uses a raw 
`GenericFutureListener` captor and suppresses unchecked warnings. It would be 
safer and clearer to parameterize the captor with the expected Netty listener 
type (matching `ChannelFuture.addListener(...)`) so the compiler can enforce 
the correct future type and the suppression can be removed.
   ```suggestion
       ArgumentCaptor<GenericFutureListener<Future<Void>>> listenerCaptor =
           ArgumentCaptor.forClass((Class<GenericFutureListener<Future<Void>>>) 
(Class<?>) GenericFutureListener.class);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to