jeffkbkim commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1631755703


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -434,6 +445,81 @@ public int size() {
         }
     }
 
+    /**
+     * A simple container class to hold all the attributes
+     * related to a pending batch.
+     */
+    private static class CoordinatorBatch {
+        /**
+         * The base (or first) offset of the batch. If the batch fails
+         * for any reason, the state machines is rolled back to it.
+         */
+        final long baseOffset;
+
+        /**
+         * The time at which the batch was created.
+         */
+        final long appendTimeMs;
+
+        /**
+         * The max batch size.
+         */
+        final int maxBatchSize;
+
+        /**
+         * The verification guard associated to the batch if it is
+         * transactional.
+         */
+        final VerificationGuard verificationGuard;
+
+        /**
+         * The byte buffer backing the records builder.
+         */
+        final ByteBuffer buffer;
+
+        /**
+         * The records builder.
+         */
+        final MemoryRecordsBuilder builder;
+
+        /**
+         * The timer used to enfore the append linger time if
+         * it is non-zero.
+         */
+        final Optional<TimerTask> lingerTimeoutTask;
+
+        /**
+         * The list of deferred events associated with the batch.
+         */
+        final List<DeferredEvent> deferredEvents;
+
+        /**
+         * The next offset. This is updated when records
+         * are added to the batch.
+         */
+        long nextOffset;

Review Comment:
   nit: can we use logEndOffset? and i'm wondering if this should be placed 
below baseOffset



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -583,11 +674,330 @@ private void unload() {
             }
             timer.cancelAll();
             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+            failCurrentBatch(Errors.NOT_COORDINATOR.exception());
             if (coordinator != null) {
                 coordinator.onUnloaded();
             }
             coordinator = null;
         }
+
+        /**
+         * Frees the current batch.
+         */
+        private void freeCurrentBatch() {
+            // Cancel the linger timeout.
+            currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+            // Release the buffer.
+            bufferSupplier.release(currentBatch.buffer);
+
+            currentBatch = null;
+        }
+
+        /**
+         * Writes the current (or pending) batch to the log. When the batch is 
written
+         * locally, a new snapshot is created in the snapshot registry and the 
events
+         * associated with the batch are added to the deferred event queue.
+         */
+        private void writeCurrentBatch() {
+            if (currentBatch != null) {
+                try {
+                    // Write the records to the log and update the last 
written offset.
+                    long offset = partitionWriter.append(
+                        tp,
+                        currentBatch.verificationGuard,
+                        currentBatch.builder.build()
+                    );
+                    coordinator.updateLastWrittenOffset(offset);
+
+                    // Add all the pending deferred events to the deferred 
event queue.
+                    for (DeferredEvent event : currentBatch.events) {
+                        deferredEventQueue.add(offset, event);
+                    }
+
+                    // Free up the current batch.
+                    freeCurrentBatch();
+                } catch (Throwable t) {
+                    failCurrentBatch(t);
+                }
+            }
+        }
+
+        /**
+         * Writes the current batch if it is transactional or if it has past 
the append linger time.
+         */
+        private void maybeWriteCurrentBatch(long currentTimeMs) {
+            if (currentBatch != null) {
+                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+                    writeCurrentBatch();
+                }
+            }
+        }
+
+        /**
+         * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+         * batch, fails all the associated events.
+         */
+        private void failCurrentBatch(Throwable t) {
+            if (currentBatch != null) {
+                coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+                for (DeferredEvent event : currentBatch.events) {
+                    event.complete(t);
+                }
+                freeCurrentBatch();
+            }
+        }
+
+        /**
+         * Allocates a new batch if none already exists.
+         */
+        private void maybeAllocateNewBatch(
+            long producerId,
+            short producerEpoch,
+            VerificationGuard verificationGuard,
+            long currentTimeMs
+        ) {
+            if (currentBatch == null) {
+                LogConfig logConfig = partitionWriter.config(tp);
+                byte magic = logConfig.recordVersion().value;
+                int maxBatchSize = logConfig.maxMessageSize();
+                long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+                ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+                MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+                    buffer,
+                    magic,
+                    compression,
+                    TimestampType.CREATE_TIME,
+                    0L,
+                    currentTimeMs,
+                    producerId,
+                    producerEpoch,
+                    0,
+                    producerId != RecordBatch.NO_PRODUCER_ID,
+                    false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                    maxBatchSize
+                );
+
+                Optional<TimerTask> lingerTimeoutTask = Optional.empty();
+                if (appendLingerMs > 0) {
+                    lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+                        @Override
+                        public void run() {
+                            scheduleInternalOperation("FlushBatch", tp, () -> {
+                                if (this.isCancelled()) return;
+                                withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+                            });
+                        }
+                    });
+                }
+
+                currentBatch = new CoordinatorBatch(
+                    prevLastWrittenOffset,
+                    currentTimeMs,
+                    maxBatchSize,
+                    verificationGuard,
+                    buffer,
+                    builder,
+                    lingerTimeoutTask
+                );
+            }
+        }
+
+        /**
+         * Appends records to the log and replay them to the state machine.
+         *
+         * @param producerId        The producer id.
+         * @param producerEpoch     The producer epoch.
+         * @param verificationGuard The verification guard.
+         * @param records           The records to append.
+         * @param replay            A boolean indicating whether the records
+         *                          must be replayed or not.
+         * @param event             The event that must be completed when the
+         *                          records are written.
+         */
+        private void append(
+            long producerId,
+            short producerEpoch,
+            VerificationGuard verificationGuard,
+            List<U> records,
+            boolean replay,
+            DeferredEvent event
+        ) {
+            if (state != CoordinatorState.ACTIVE) {
+                throw new IllegalStateException("Coordinator must be active to 
append records");
+            }
+
+            if (records.isEmpty()) {
+                // If the records are empty, it was a read operation after 
all. In this case,
+                // the response can be returned directly iff there are no 
pending write operations;
+                // otherwise, the read needs to wait on the last write 
operation to be completed.
+                if (currentBatch != null) {
+                    currentBatch.events.add(event);
+                } else {
+                    OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
+                    if (pendingOffset.isPresent()) {
+                        deferredEventQueue.add(pendingOffset.getAsLong(), 
event);
+                    } else {
+                        event.complete(null);
+                    }
+                }
+            } else {
+                // If the records are not empty, first, they are applied to 
the state machine,
+                // second, then are appended to the opened batch.
+                long currentTimeMs = time.milliseconds();
+
+                // If the current write operation is transactional, the 
current batch
+                // is written before proceeding with it.
+                if (producerId != RecordBatch.NO_PRODUCER_ID) {
+                    writeCurrentBatch();
+                }
+
+                // Allocate a new batch if none exists.
+                maybeAllocateNewBatch(
+                    producerId,
+                    producerEpoch,
+                    verificationGuard,
+                    currentTimeMs
+                );
+
+                // Prepare the records.
+                List<SimpleRecord> recordsToAppend = new 
ArrayList<>(records.size());
+                for (U record : records) {
+                    recordsToAppend.add(new SimpleRecord(
+                        currentTimeMs,
+                        serializer.serializeKey(record),
+                        serializer.serializeValue(record)
+                    ));
+                }
+
+                // Compute the estimated size of the records.
+                int estimatedSize = AbstractRecords.estimateSizeInBytes(
+                    currentBatch.builder.magic(),
+                    compression.type(),
+                    recordsToAppend
+                );
+
+                // Check if the current batch has enough space. We check is 
before
+                // replaying the records in order to avoid having to revert 
back
+                // changes if the records do not fit within a batch.
+                if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                    if (currentBatch.builder.numRecords() == 0) {
+                        // If the number of records in the current batch is 
zero, it means that
+                        // the records are larger than the max batch size.
+                        throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+                            " bytes in append to partition " + tp + " which 
exceeds the maximum " +
+                            "configured size of " + currentBatch.maxBatchSize 
+ ".");
+                    } else {
+                        // Otherwise, we write the current batch, allocate a 
new one and re-verify
+                        // whether the records fit in it.
+                        writeCurrentBatch();
+                        maybeAllocateNewBatch(
+                            producerId,
+                            producerEpoch,
+                            verificationGuard,
+                            currentTimeMs
+                        );
+                        if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+                            throw new RecordTooLargeException("Message batch 
size is " + estimatedSize +
+                                " bytes in append to partition " + tp + " 
which exceeds the maximum " +
+                                "configured size of " + 
currentBatch.maxBatchSize + ".");
+                        }
+                    }
+                }
+
+                // Add the event to the list of pending events associated with 
the batch.
+                currentBatch.events.add(event);
+
+                try {
+                    // Apply record to the state machine.
+                    if (replay) {
+                        for (int i = 0; i < records.size(); i++) {
+                            // We compute the offset of the record based on 
the last written offset. The
+                            // coordinator is the single writer to the 
underlying partition so we can
+                            // deduce it like this.
+                            coordinator.replay(
+                                currentBatch.nextOffset + i,

Review Comment:
   i think current and pending refers to the same batch, the one we're 
currently building that has not been flushed
   ```
           /**
            * The current (or pending) batch.
            */
           CoordinatorBatch currentBatch;
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(

Review Comment:
   nit question: for single list elements, should we use Arrays#asList or 
Collections#singletonList?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write all the records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        assertFutureThrows(write, RecordTooLargeException.class);

Review Comment:
   can we confirm that a batch with 3 records don't cause this exception?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch

Review Comment:
   i have the same question. maybe it's worth adding a bit to why it cannot go 
into the existing batch



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write all the records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        assertFutureThrows(write, RecordTooLargeException.class);
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept no writes.
+        MockPartitionWriter writer = new MockPartitionWriter(0);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #4. This write cannot make it in the current batch. So the 
current batch
+        // is flushed. It will fail. So we expect all writes to fail.
+        CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, KafkaException.class);
+        assertFutureThrows(write2, KafkaException.class);
+        assertFutureThrows(write3, KafkaException.class);
+        // Write #4 is also expected to fail.
+        assertFutureThrows(write4, KafkaException.class);
+
+        // Verify the state. The record is replayed but not written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Override the coordinator with a coordinator that throws
+        // an exception when replay is called.
+        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+        ctx.coordinator = new SnapshottableCoordinator<>(
+            new LogContext(),
+            snapshotRegistry,
+            new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+                @Override
+                public void replay(
+                    long offset,
+                    long producerId,
+                    short producerEpoch,
+                    String record
+                ) throws RuntimeException {
+                    if (offset >= 1) {
+                        throw new IllegalArgumentException("error");
+                    }
+                    super.replay(
+                        offset,
+                        producerId,
+                        producerEpoch,
+                        record
+                    );
+                }
+            },
+            TP
+        );
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2. It should fail.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, IllegalArgumentException.class);
+        assertFutureThrows(write2, IllegalArgumentException.class);
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleTransactionalWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Transactional write #2 with one record. This will flush the current 
batch.
+        CompletableFuture<String> write2 = 
runtime.scheduleTransactionalWriteOperation(
+            "txn-write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(mkSet("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
+        ), writer.entries(TP));
+
+        // Write #3 with one record.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(mkSet("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1", "record#3"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
+        ), writer.entries(TP));
+
+        // Complete transaction #1. It will flush the current patch if any.
+        CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(
+            "complete#1",
+            TP,
+            100L,
+            (short) 50,
+            10,
+            TransactionResult.COMMIT,
+            DEFAULT_WRITE_TIMEOUT
+        );
+
+        // Verify that the completion is not committed yet.
+        assertFalse(complete1.isDone());
+
+        // Verify the state.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1", "record#2", "record#3"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), "record#1"),
+            transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2"),
+            records(timer.time().milliseconds(), "record#3"),
+            endTransactionMarker(100L, (short) 50, 
timer.time().milliseconds(), 10, ControlRecordType.COMMIT)
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.

Review Comment:
   i'm wondering if we should check the state after we commit up to write3 but 
before we commit complete1. or maybe this is captured elsewhere?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write all the records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        assertFutureThrows(write, RecordTooLargeException.class);
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept no writes.
+        MockPartitionWriter writer = new MockPartitionWriter(0);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #4. This write cannot make it in the current batch. So the 
current batch
+        // is flushed. It will fail. So we expect all writes to fail.
+        CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, KafkaException.class);
+        assertFutureThrows(write2, KafkaException.class);
+        assertFutureThrows(write3, KafkaException.class);
+        // Write #4 is also expected to fail.
+        assertFutureThrows(write4, KafkaException.class);
+
+        // Verify the state. The record is replayed but not written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Override the coordinator with a coordinator that throws
+        // an exception when replay is called.
+        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+        ctx.coordinator = new SnapshottableCoordinator<>(
+            new LogContext(),
+            snapshotRegistry,
+            new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+                @Override
+                public void replay(
+                    long offset,
+                    long producerId,
+                    short producerEpoch,
+                    String record
+                ) throws RuntimeException {
+                    if (offset >= 1) {
+                        throw new IllegalArgumentException("error");
+                    }
+                    super.replay(
+                        offset,
+                        producerId,
+                        producerEpoch,
+                        record
+                    );
+                }
+            },
+            TP
+        );
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2. It should fail.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, IllegalArgumentException.class);
+        assertFutureThrows(write2, IllegalArgumentException.class);
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleTransactionalWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Transactional write #2 with one record. This will flush the current 
batch.
+        CompletableFuture<String> write2 = 
runtime.scheduleTransactionalWriteOperation(
+            "txn-write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.

Review Comment:
   batches are written here because of the transactional write#2, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to