junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2624340885


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2526,17 +2526,20 @@ public void 
testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedExcep
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(1, sender.inFlightBatches(tp0).size(), "Expect one 
in-flight batch in accumulator");
 
+        // let delivery timeout expire without responding
+        time.sleep(deliveryTimeoutMs);
+        sender.runOnce();
+        // inflight batch should still be there
+        assertEquals(1, sender.inFlightBatches(tp0).size());
+        assertFalse(request.isDone());
+        
         Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
         responseMap.put(new TopicIdPartition(TOPIC_ID, tp0), new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
         client.respond(new ProduceResponse(responseMap));
 
-        time.sleep(deliveryTimeoutMs);
         sender.runOnce();  // receive first response
-        assertEquals(0, sender.inFlightBatches(tp0).size(), "Expect zero 
in-flight batch in accumulator");
-        assertInstanceOf(
-            TimeoutException.class,
-            assertThrows(ExecutionException.class, request::get).getCause(),
-            "The expired batch should throw a TimeoutException");
+        assertEquals(0, sender.inFlightBatches(tp0).size());

Review Comment:
   This is an existing problem. Should we remove `first` in the line above 
since there is only 1 response.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2642,11 +2645,11 @@ public void testExpiredBatchDoesNotRetry() throws 
Exception {
     }
 
     @Test
-    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws 
Exception {
+    public void testBatchDoesNotSplitOnMessageTooLargeError() throws Exception 
{

Review Comment:
   This name is inaccurate since the batch does split in this case. How about 
testExpiredBatchStillSplitOnMessageTooLargeError ?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1593,7 +1596,7 @@ private boolean hasHigherPartitionLeaderEpoch(RecordBatch 
batch, AppendOrigin or
      * @param info The general information of the message set
      * @return A trimmed message set. This may be the same as what was passed 
in, or it may not.
      */
-    private MemoryRecords trimInvalidBytes(MemoryRecords records, 
LogAppendInfo info) {
+    private MemoryRecords trimInvalidBytes(MemoryRecords records, 
LogAppendInfo info, boolean isLeader) {

Review Comment:
   Could we add the new param to javadoc?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1605,6 +1608,10 @@ private MemoryRecords trimInvalidBytes(MemoryRecords 
records, LogAppendInfo info
             // trim invalid bytes
             ByteBuffer validByteBuffer = records.buffer().duplicate();
             validByteBuffer.limit(validBytes);
+            if (isLeader) {
+                logger.warn("Trimming invalid bytes from message set for 
partition {}. Original size: {} bytes, valid bytes: {}, trimmed bytes: {}. ",

Review Comment:
   We have the following log ident. So, there is not need to include partition 
explicitly.
   
   `this.logIdent = "[UnifiedLog partition=" + topicPartition() + ", dir=" + 
parentDir() + "] ";`



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2655,17 +2658,15 @@ public void 
testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exceptio
         client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
 
         time.sleep(deliverTimeoutMs);
-        // expire the batch and process the response
         sender.runOnce();
-        assertTrue(request1.isDone());
-        assertTrue(request2.isDone());
-        assertEquals(0, client.inFlightRequestCount());
-        assertEquals(0, sender.inFlightBatches(tp0).size());
+        sender.runOnce();

Review Comment:
   It would be useful to document the behavior after each runOnce(). After the 
first one, we expect the original batch to be split. After the second one, we 
expect the first split batch to be sent.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1100,7 +1102,8 @@ private LogAppendInfo append(MemoryRecords records,
                                  VerificationGuard verificationGuard,
                                  boolean ignoreRecordSize,
                                  byte toMagic,
-                                 short transactionVersion) {
+                                 short transactionVersion,
+                                 boolean isLeader) {

Review Comment:
   We don't need to pass in isLeader. isLeader can be computed as origin != 
REPLICATION.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2686,14 +2687,12 @@ public void testResetNextBatchExpiry() throws Exception 
{
         inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), 
any(), anyLong(), anyBoolean(), anyInt(), any());
         inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
         inOrder.verify(client).poll(eq(0L), anyLong());
-        inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), 
anyLong());

Review Comment:
   It seems that the next two poll() calls should use maxLong as the poll time. 
We should verify them. However, if I add the verification with maxLong, the 
test fails. Do you know why @gongxuanzhang and @kirktrue ?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2642,11 +2645,11 @@ public void testExpiredBatchDoesNotRetry() throws 
Exception {
     }
 
     @Test
-    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws 
Exception {
+    public void testBatchDoesNotSplitOnMessageTooLargeError() throws Exception 
{
         long deliverTimeoutMs = 1500L;
         // create a producer batch with more than one record so it is eligible 
for splitting
-        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
-        Future<RecordMetadata> request2 = appendToAccumulator(tp0);
+        appendToAccumulator(tp0);

Review Comment:
   It would be useful to track those futures, complete the responses after the 
split and verify that neither future throws.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to