dajac commented on code in PR #18144:
URL: https://github.com/apache/kafka/pull/18144#discussion_r1887047404


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -767,6 +767,18 @@ private void freeCurrentBatch() {
         private void flushCurrentBatch() {
             if (currentBatch != null) {
                 try {
+                    int numRecords = currentBatch.builder.numRecords();
+                    if (numRecords == 0) {

Review Comment:
   nit: Should we inline `numRecords`? We don't reuse it afterwards.



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -767,6 +767,18 @@ private void freeCurrentBatch() {
         private void flushCurrentBatch() {
             if (currentBatch != null) {
                 try {
+                    int numRecords = currentBatch.builder.numRecords();
+                    if (numRecords == 0) {
+                        // The only way we can get here is if append() has 
failed in an unexpected
+                        // way and left an empty batch. Try to clean it up.
+                        log.warn("Tried to flush an empty batch for {}.", tp);

Review Comment:
   nit: I would log it as debug because users may be scared about it.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -4213,6 +4246,73 @@ public void 
testScheduleNonAtomicWriteOperationWhenWriteFails() {
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
 
+    @Test
+    public void testEmptyBatch() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ThrowingSerializer<String> serializer = new 
ThrowingSerializer<String>(new StringSerializer());
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(serializer)
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Write #1, which fails.
+        serializer.throwOnNextOperation();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("1"), "response1"));
+
+        // Write #1 should fail and leave an empty batch.
+        assertFutureThrows(write1, BufferOverflowException.class);
+        assertNotNull(ctx.currentBatch);
+
+        // Write #2, with no records.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response2"));
+
+        // Write #2 should not be attached to the empty batch.
+        assertTrue(write2.isDone());
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+
+        // Complete transaction #1. It will flush the current empty batch.
+        // The coordinator must not try to write an empty batch, otherwise the 
mock partition writer
+        // will throw an exception.

Review Comment:
   For my understanding, what does make it fail? I don't see any checks in the 
mock partition writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to