This is an automated email from the ASF dual-hosted git repository. philippus pushed a commit to branch retry-committing-for-transient-exceptions in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
commit 57eede06ee04dcd419fc191c56aa242dc16286e8 Author: Philippus <[email protected]> AuthorDate: Tue Apr 14 08:28:10 2026 +0200 Retry committing for transient exceptions --- .../apache/pekko/kafka/internal/KafkaConsumerActor.scala | 9 +++++++-- .../pekko/kafka/internal/CommittingWithMockSpec.scala | 14 +++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala index fa1356a5..c16ea42a 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala @@ -36,7 +36,11 @@ import pekko.kafka.KafkaConsumerActor.{ StopLike, StoppingException } import pekko.kafka._ import pekko.kafka.scaladsl.PartitionAssignmentHandler import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.errors.RebalanceInProgressException +import org.apache.kafka.common.errors.{ + CoordinatorLoadInProgressException, + RebalanceInProgressException, + TimeoutException +} import org.apache.kafka.common.{ Metric, MetricName, TopicPartition } import scala.annotation.nowarn @@ -597,7 +601,8 @@ import scala.util.control.NonFatal progressTracker.committed(offsets) replyTo.foreach(_ ! Done) - case e: RebalanceInProgressException => retryCommits(duration, e) + case e @ (_: RebalanceInProgressException | _: TimeoutException | _: CoordinatorLoadInProgressException) => + retryCommits(duration, e) case e: RetriableCommitFailedException => retryCommits(duration, e.getCause) case commitException => diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala index 43ce86b3..6dbd31c8 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala @@ -31,7 +31,11 @@ import pekko.testkit.TestKit import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.RebalanceInProgressException +import org.apache.kafka.common.errors.{ + CoordinatorLoadInProgressException, + RebalanceInProgressException, + TimeoutException +} import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.concurrent.{ Eventually, IntegrationPatience, ScalaFutures } import org.scalatest.BeforeAndAfterAll @@ -201,8 +205,12 @@ class CommittingWithMockSpec(_system: ActorSystem) Await.result(control.shutdown(), remainingOrDefault) } - val exceptions = List(new RebalanceInProgressException(), - new RetriableCommitFailedException(new CommitTimeoutException("injected15"))) + val exceptions = List( + new RebalanceInProgressException(), + new TimeoutException(), + new CoordinatorLoadInProgressException("coordinator-load-in-progress-exception"), + new RetriableCommitFailedException(new CommitTimeoutException("injected15")) + ) for (exception <- exceptions) { it should s"retry commit on ${exception.getClass.getSimpleName}" in assertAllStagesStopped { val retries = 4 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
