squah-confluent commented on code in PR #18144:
URL: https://github.com/apache/kafka/pull/18144#discussion_r1880674463


##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java:
##########
@@ -1027,42 +1013,78 @@ private void append(
                         }
                     }
 
-                    try {
-                        if (replay) {
-                            coordinator.replay(
-                                currentBatch.nextOffset,
-                                producerId,
-                                producerEpoch,
-                                recordToReplay
+                    for (int i = 0; i < records.size(); i++) {
+                        U recordToReplay = records.get(i);
+                        SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+                        if (!isAtomic) {
+                            // Check if the current batch has enough space. We 
check this before
+                            // replaying the record in order to avoid having 
to revert back
+                            // changes if the record do not fit within a batch.
+                            boolean hasRoomFor = 
currentBatch.builder.hasRoomFor(
+                                recordToAppend.timestamp(),
+                                recordToAppend.key(),
+                                recordToAppend.value(),
+                                recordToAppend.headers()
                             );
+
+                            if (!hasRoomFor) {
+                                // If flushing fails, we don't catch the 
exception in order to let
+                                // the caller fail the current operation.
+                                flushCurrentBatch();
+                                maybeAllocateNewBatch(
+                                    producerId,
+                                    producerEpoch,
+                                    verificationGuard,
+                                    currentTimeMs
+                                );
+                            }
                         }
 
-                        currentBatch.builder.append(recordToAppend);
-                        currentBatch.nextOffset++;
-                    } catch (Throwable t) {
-                        log.error("Replaying record {} to {} failed due to: 
{}.", recordToReplay, tp, t.getMessage());
+                        try {
+                            if (replay) {
+                                coordinator.replay(
+                                    currentBatch.nextOffset,
+                                    producerId,
+                                    producerEpoch,
+                                    recordToReplay
+                                );
+                            }
+
+                            currentBatch.builder.append(recordToAppend);
+                            currentBatch.nextOffset++;
+                        } catch (Throwable t) {
+                            log.error("Replaying record {} to {} failed due 
to: {}.", recordToReplay, tp, t.getMessage());
 
-                        // Add the event to the list of pending events 
associated with the last
-                        // batch in order to fail it too.
-                        currentBatch.deferredEvents.add(event);
+                            // Add the event to the list of pending events 
associated with the last
+                            // batch in order to fail it too.
+                            currentBatch.deferredEvents.add(event);
 
-                        // If an exception is thrown, we fail the entire 
batch. Exceptions should be
-                        // really exceptional in this code path and they would 
usually be the results
-                        // of bugs preventing records to be replayed.
-                        failCurrentBatch(t);
+                            // If an exception is thrown, we fail the entire 
batch. Exceptions should be
+                            // really exceptional in this code path and they 
would usually be the results
+                            // of bugs preventing records to be replayed.
+                            failCurrentBatch(t);
 
-                        return;
+                            return;
+                        }

Review Comment:
   It's kind of ugly to have a nested try-catch for the same exceptions. But I 
think the more specific error log for replay errors is useful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to