This is an automated email from the ASF dual-hosted git repository.
philippus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 398b44f2 Retry committing for transient exceptions (#505)
398b44f2 is described below
commit 398b44f2a14c5fa7809631bb584beb5da03eb23e
Author: Philippus Baalman <[email protected]>
AuthorDate: Tue Apr 14 17:11:33 2026 +0200
Retry committing for transient exceptions (#505)
---
.../apache/pekko/kafka/internal/KafkaConsumerActor.scala | 9 +++++++--
.../pekko/kafka/internal/CommittingWithMockSpec.scala | 14 +++++++++++---
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index fa1356a5..c16ea42a 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -36,7 +36,11 @@ import pekko.kafka.KafkaConsumerActor.{ StopLike,
StoppingException }
import pekko.kafka._
import pekko.kafka.scaladsl.PartitionAssignmentHandler
import org.apache.kafka.clients.consumer._
-import org.apache.kafka.common.errors.RebalanceInProgressException
+import org.apache.kafka.common.errors.{
+ CoordinatorLoadInProgressException,
+ RebalanceInProgressException,
+ TimeoutException
+}
import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
import scala.annotation.nowarn
@@ -597,7 +601,8 @@ import scala.util.control.NonFatal
progressTracker.committed(offsets)
replyTo.foreach(_ ! Done)
- case e: RebalanceInProgressException => retryCommits(duration, e)
+ case e @ (_: RebalanceInProgressException | _: TimeoutException |
_: CoordinatorLoadInProgressException) =>
+ retryCommits(duration, e)
case e: RetriableCommitFailedException => retryCommits(duration,
e.getCause)
case commitException =>
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 43ce86b3..6dbd31c8 100644
---
a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++
b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -31,7 +31,11 @@ import pekko.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.RebalanceInProgressException
+import org.apache.kafka.common.errors.{
+ CoordinatorLoadInProgressException,
+ RebalanceInProgressException,
+ TimeoutException
+}
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.concurrent.{ Eventually, IntegrationPatience,
ScalaFutures }
import org.scalatest.BeforeAndAfterAll
@@ -201,8 +205,12 @@ class CommittingWithMockSpec(_system: ActorSystem)
Await.result(control.shutdown(), remainingOrDefault)
}
- val exceptions = List(new RebalanceInProgressException(),
- new RetriableCommitFailedException(new
CommitTimeoutException("injected15")))
+ val exceptions = List(
+ new RebalanceInProgressException(),
+ new TimeoutException(),
+ new
CoordinatorLoadInProgressException("coordinator-load-in-progress-exception"),
+ new RetriableCommitFailedException(new
CommitTimeoutException("injected15"))
+ )
for (exception <- exceptions) {
it should s"retry commit on ${exception.getClass.getSimpleName}" in
assertAllStagesStopped {
val retries = 4
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]