squah-confluent commented on code in PR #18144:
URL: https://github.com/apache/kafka/pull/18144#discussion_r1887101245


##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -4213,6 +4246,73 @@ public void 
testScheduleNonAtomicWriteOperationWhenWriteFails() {
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
 
+    @Test
+    public void testEmptyBatch() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ThrowingSerializer<String> serializer = new 
ThrowingSerializer<String>(new StringSerializer());
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Write #1, which fails.
+        serializer.throwOnNextOperation();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("1"), "response1"));
+
+        // Write #1 should fail and leave an empty batch.
+        assertFutureThrows(write1, BufferOverflowException.class);
+        assertNotNull(ctx.currentBatch);
+
+        // Write #2, with no records.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response2"));
+
+        // Write #2 should not be attached to the empty batch.
+        assertTrue(write2.isDone());
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+
+        // Complete transaction #1. It will flush the current empty batch.
+        // The coordinator must not try to write an empty batch, otherwise the 
mock partition writer
+        // will throw an exception.

Review Comment:
   The real partition writer returns an offset of 0 which makes 
updateLastWrittenOffset throw because the offset must not go backwards. I added
   ```
               if (batch.validBytes() <= 0)
                   throw new KafkaException("Coordinator tried to write an 
empty batch");
   ```
   in the mock partition writer in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to