This is an automated email from the ASF dual-hosted git repository.
ramanathan1504 pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/logging-log4j2.git
The following commit(s) were added to refs/heads/2.x by this push:
new 12380a6619 Fix KafkaAppender reporting error after successful retry
(#4125)
12380a6619 is described below
commit 12380a6619b1e480b580295be5ebbe0d7e641dc4
Author: Sebastien Tardif <[email protected]>
AuthorDate: Tue May 19 22:55:55 2026 -0700
Fix KafkaAppender reporting error after successful retry (#4125)
* Fix KafkaAppender reporting error after successful retry
When retryCount is configured and the initial tryAppend() fails, the
retry loop uses break to exit on success. However, break only exits the
while loop and execution always reaches the error() call afterward. This
causes spurious error notifications for transient Kafka failures that
were successfully recovered by a retry.
Replace break with return so that a successful retry exits append()
without reporting an error. Retry exceptions are now logged at DEBUG
level for diagnostics instead of being silently discarded.
Also remove dead code in Builder.getRetryCount() where Integer.valueOf(int)
was wrapped in a NumberFormatException catch that can never fire.
The bug was introduced in #315.
Signed-off-by: Sebastien Tardif <[email protected]>
* Add changelog entry for KafkaAppender retry fix
Signed-off-by: Sebastien Tardif <[email protected]>
* Use CloseableThreadContext in testRetrySuccessDoesNotReportError
Adopt reviewer suggestion to use try-with-resources CloseableThreadContext
instead of manual ThreadContext.put/clearMap.
---------
Signed-off-by: Sebastien Tardif <[email protected]>
---
.../core/appender/mom/kafka/KafkaAppenderTest.java | 53 +++++++++++++++++++---
.../core/appender/mom/kafka/KafkaAppender.java | 20 ++++----
.../fix_kafka_appender_retry_error_reporting.xml | 8 ++++
3 files changed, 64 insertions(+), 17 deletions(-)
diff --git
a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
index ef8c24edf0..b265126cd2 100644
---
a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
+++
b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java
@@ -18,6 +18,7 @@ package org.apache.logging.log4j.core.appender.mom.kafka;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -30,14 +31,18 @@ import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.ErrorHandler;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.core.test.categories.Appenders;
@@ -55,6 +60,8 @@ public class KafkaAppenderTest {
private static final Serializer<byte[]> SERIALIZER = new
ByteArraySerializer();
+ private static final AtomicInteger retrySendCount = new AtomicInteger(0);
+
private static final MockProducer<byte[], byte[]> kafka =
new MockProducer<byte[], byte[]>(true, SERIALIZER, SERIALIZER) {
@@ -65,12 +72,13 @@ public class KafkaAppenderTest {
final boolean isRetryTest =
"true".equals(ThreadContext.get("KafkaAppenderWithRetryCount"));
if (isRetryTest) {
- try {
- throw new TimeoutException();
- } catch (TimeoutException e) {
- // TODO Auto-generated catch block
- throw new RuntimeException(e);
- }
+ throw new RuntimeException(new TimeoutException());
+ }
+
+ final boolean isRetrySuccessTest =
+
"true".equals(ThreadContext.get("KafkaAppenderRetrySuccessTest"));
+ if (isRetrySuccessTest && retrySendCount.getAndIncrement()
== 0) {
+ throw new RuntimeException(new TimeoutException());
}
return retVal;
@@ -206,6 +214,39 @@ public class KafkaAppenderTest {
}
}
+ @Test
+ public void testRetrySuccessDoesNotReportError() {
+ retrySendCount.set(0);
+ final AtomicBoolean errorReported = new AtomicBoolean(false);
+ final Appender appender =
ctx.getRequiredAppender("KafkaAppenderWithRetryCount");
+ final ErrorHandler originalHandler = ((KafkaAppender)
appender).getHandler();
+ try (final CloseableThreadContext.Instance ignored =
+ CloseableThreadContext.put("KafkaAppenderRetrySuccessTest",
"true")) {
+ ((KafkaAppender) appender).setHandler(new ErrorHandler() {
+ @Override
+ public void error(final String msg) {
+ errorReported.set(true);
+ }
+
+ @Override
+ public void error(final String msg, final Throwable t) {
+ errorReported.set(true);
+ }
+
+ @Override
+ public void error(final String msg, final LogEvent event,
final Throwable t) {
+ errorReported.set(true);
+ }
+ });
+ appender.append(createLogEvent());
+ final List<ProducerRecord<byte[], byte[]>> history =
kafka.history();
+ assertEquals(2, history.size());
+ assertFalse("Error should not be reported when retry succeeds",
errorReported.get());
+ } finally {
+ ((KafkaAppender) appender).setHandler(originalHandler);
+ }
+ }
+
@Test
public void testAppenderNoEventTimestamp() {
final Appender appender =
ctx.getRequiredAppender("KafkaAppenderNoEventTimestamp");
diff --git
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
index a9788a2a59..2fbccff7b0 100644
---
a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
+++
b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java
@@ -94,13 +94,7 @@ public final class KafkaAppender extends AbstractAppender {
}
public Integer getRetryCount() {
- Integer intRetryCount = null;
- try {
- intRetryCount = Integer.valueOf(retryCount);
- } catch (NumberFormatException e) {
-
- }
- return intRetryCount;
+ return retryCount;
}
public String getTopic() {
@@ -216,16 +210,20 @@ public final class KafkaAppender extends AbstractAppender
{
try {
tryAppend(event);
} catch (final Exception e) {
-
if (this.retryCount != null) {
int currentRetryAttempt = 0;
while (currentRetryAttempt < this.retryCount) {
currentRetryAttempt++;
try {
tryAppend(event);
- break;
- } catch (Exception e1) {
-
+ return;
+ } catch (final Exception retryException) {
+ LOGGER.debug(
+ "Unable to write to Kafka in appender
[{}], retry attempt [{}/{}].",
+ getName(),
+ currentRetryAttempt,
+ this.retryCount,
+ retryException);
}
}
}
diff --git a/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml
b/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml
new file mode 100644
index 0000000000..f3884f41e8
--- /dev/null
+++ b/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<entry xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="https://logging.apache.org/xml/ns"
+ xsi:schemaLocation="https://logging.apache.org/xml/ns
https://logging.apache.org/xml/ns/log4j-changelog-0.xsd"
+ type="fixed">
+ <issue id="4125" link="https://github.com/apache/logging-log4j2/pull/4125"/>
+ <description format="asciidoc">Fix `KafkaAppender` reporting error to error
handler even after a successful retry</description>
+</entry>