This is an automated email from the ASF dual-hosted git repository.

philippus pushed a commit to branch retry-committing-for-transient-exceptions
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git

commit 57eede06ee04dcd419fc191c56aa242dc16286e8
Author: Philippus <[email protected]>
AuthorDate: Tue Apr 14 08:28:10 2026 +0200

    Retry committing for transient exceptions
---
 .../apache/pekko/kafka/internal/KafkaConsumerActor.scala   |  9 +++++++--
 .../pekko/kafka/internal/CommittingWithMockSpec.scala      | 14 +++++++++++---
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index fa1356a5..c16ea42a 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -36,7 +36,11 @@ import pekko.kafka.KafkaConsumerActor.{ StopLike, 
StoppingException }
 import pekko.kafka._
 import pekko.kafka.scaladsl.PartitionAssignmentHandler
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.common.errors.RebalanceInProgressException
+import org.apache.kafka.common.errors.{
+  CoordinatorLoadInProgressException,
+  RebalanceInProgressException,
+  TimeoutException
+}
 import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
 
 import scala.annotation.nowarn
@@ -597,7 +601,8 @@ import scala.util.control.NonFatal
               progressTracker.committed(offsets)
               replyTo.foreach(_ ! Done)
 
-            case e: RebalanceInProgressException   => retryCommits(duration, e)
+            case e @ (_: RebalanceInProgressException | _: TimeoutException | 
_: CoordinatorLoadInProgressException) =>
+              retryCommits(duration, e)
             case e: RetriableCommitFailedException => retryCommits(duration, 
e.getCause)
 
             case commitException =>
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 43ce86b3..6dbd31c8 100644
--- 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++ 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -31,7 +31,11 @@ import pekko.testkit.TestKit
 import com.typesafe.config.ConfigFactory
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.RebalanceInProgressException
+import org.apache.kafka.common.errors.{
+  CoordinatorLoadInProgressException,
+  RebalanceInProgressException,
+  TimeoutException
+}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.scalatest.concurrent.{ Eventually, IntegrationPatience, 
ScalaFutures }
 import org.scalatest.BeforeAndAfterAll
@@ -201,8 +205,12 @@ class CommittingWithMockSpec(_system: ActorSystem)
     Await.result(control.shutdown(), remainingOrDefault)
   }
 
-  val exceptions = List(new RebalanceInProgressException(),
-    new RetriableCommitFailedException(new 
CommitTimeoutException("injected15")))
+  val exceptions = List(
+    new RebalanceInProgressException(),
+    new TimeoutException(),
+    new 
CoordinatorLoadInProgressException("coordinator-load-in-progress-exception"),
+    new RetriableCommitFailedException(new 
CommitTimeoutException("injected15"))
+  )
   for (exception <- exceptions) {
     it should s"retry commit on ${exception.getClass.getSimpleName}" in 
assertAllStagesStopped {
       val retries = 4


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to