This is an automated email from the ASF dual-hosted git repository.

philippus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new e7f15e33 Allow simpler access to partition assignment strategies (#506)
e7f15e33 is described below

commit e7f15e339775f371ee6334e0aead3b29476304a6
Author: Philippus Baalman <[email protected]>
AuthorDate: Thu Apr 16 11:44:50 2026 +0200

    Allow simpler access to partition assignment strategies (#506)
---
 .../org/apache/pekko/kafka/ConsumerSettings.scala  | 32 ++++++++++++++++++
 docs/src/main/paradox/consumer.md                  |  3 ++
 .../java/docs/javadsl/ConsumerSettingsTest.java    | 18 ++++++++++
 .../apache/pekko/kafka/ConsumerSettingsSpec.scala  | 38 ++++++++++++++++++++++
 4 files changed, 91 insertions(+)

diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index 89d64936..e2989c08 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -292,6 +292,38 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
   def withGroupInstanceId(groupInstanceId: String): ConsumerSettings[K, V] =
     withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
 
+  /**
+   * Scala API:
+   * A list of class names or class types, ordered by preference, of supported
+   * partition assignment strategies that the client will use to distribute
+   * partition ownership amongst consumer instances when group management is 
used.
+   *
+   * See 
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+   */
+  def withPartitionAssignmentStrategies(strategies: List[String]): 
ConsumerSettings[K, V] =
+    withProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
strategies.mkString(","))
+
+  /**
+   * Java API:
+   * A list of class names or class types, ordered by preference, of supported
+   * partition assignment strategies that the client will use to distribute
+   * partition ownership amongst consumer instances when group management is 
used.
+   *
+   * See 
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+   */
+  def withPartitionAssignmentStrategies(strategies: Array[String]): 
ConsumerSettings[K, V] =
+    withPartitionAssignmentStrategies(strategies.toList)
+
+  /**
+   * Sets the `CooperativeStickyAssignor` assignment strategy.
+   *
+   * @see 
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+   * @see https://kafka.apache.org/33/documentation.html#upgrade_300_notable
+   */
+  def withPartitionAssignmentStrategyCooperativeStickyAssignor(): 
ConsumerSettings[K, V] =
+    withProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+      
classOf[org.apache.kafka.clients.consumer.CooperativeStickyAssignor].getName)
+
   /**
    * Scala API:
    * The raw properties of the kafka-clients driver, see constants in
diff --git a/docs/src/main/paradox/consumer.md 
b/docs/src/main/paradox/consumer.md
index dbe72072..2a897fce 100644
--- a/docs/src/main/paradox/consumer.md
+++ b/docs/src/main/paradox/consumer.md
@@ -49,6 +49,7 @@ When creating a consumer source you need to pass in 
@apidoc[ConsumerSettings] th
 * bootstrap servers of the Kafka cluster (see @ref:[Service 
discovery](discovery.md) to defer the server configuration)
 * group id for the consumer, note that offsets are always committed for a 
given consumer group
 * Kafka consumer tuning parameters
+* assignment strategies
 
 Apache Pekko Connectors Kafka's defaults for all settings are defined in 
`reference.conf` which is included in the library JAR.
 
@@ -59,6 +60,8 @@ Important consumer settings
 | kafka-clients | Section for properties passed unchanged to the Kafka client 
(see @extref:[Kafka's Consumer 
Configs](kafka:/documentation.html#consumerconfigs)) |
 | connection-checker | Configuration to let the stream fail if the connection 
to the Kafka broker fails. |
 
+Explicitly selecting a [Consumer Assignment 
Strategy](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
 such as 
@javadoc[CooperativeStickyAssignor](org.apache.kafka.clients.consumer.CooperativeStickyAssignor)
 is recommended. They were introduced in [Kafka 
3.0](https://kafka.apache.org/33/documentation.html#upgrade_300_notable). 
Please check the [Kafka upgrade 
guide](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429:+Kafka+Consumer+Incremental+
 [...]
+
 reference.conf (HOCON)
 : @@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings 
}
 
diff --git a/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java 
b/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
index 92d36bcb..fe71b5dc 100644
--- a/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
+++ b/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
@@ -14,8 +14,11 @@
 
 package docs.javadsl;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pekko.actor.ActorSystem;
 import org.apache.pekko.kafka.ConsumerSettings;
@@ -46,4 +49,19 @@ public class ConsumerSettingsTest {
     // #discovery-settings
     TestKit.shutdownActorSystem(system);
   }
+
+  @Test
+  public void setAssignor() throws Exception {
+    ActorSystem system = ActorSystem.create("ConsumerSettingsTest");
+    ConsumerSettings<String, String> settings =
+        ConsumerSettings.create(system, new StringDeserializer(), new 
StringDeserializer())
+            .withPartitionAssignmentStrategies(
+                new String[] {
+                  
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName(),
+                  
org.apache.kafka.clients.consumer.StickyAssignor.class.getName()
+                });
+    assertEquals(
+        
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor",
+        
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+  }
 }
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
index a037d8bb..ca3b472f 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
@@ -19,6 +19,7 @@ import pekko.actor.ActorSystem
 import pekko.kafka.tests.scaladsl.LogCapturing
 import pekko.testkit.TestKit
 import com.typesafe.config.ConfigFactory
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, 
CooperativeStickyAssignor }
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, 
StringDeserializer }
 import org.scalatest.OptionValues
@@ -104,6 +105,43 @@ class ConsumerSettingsSpec
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
     }
 
+    "handle withPartitionAssignmentStrategies" in {
+      val conf = ConfigFactory
+        .parseString(
+          """
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = 
org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.value.deserializer = 
org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
+        """)
+        .withFallback(ConfigFactory.load())
+        .getConfig("pekko.kafka.consumer")
+      val settings = ConsumerSettings(conf, None, None)
+        .withPartitionAssignmentStrategies(List(
+          
classOf[org.apache.kafka.clients.consumer.CooperativeStickyAssignor].getName,
+          classOf[org.apache.kafka.clients.consumer.StickyAssignor].getName)
+        )
+      
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) 
should ===(
+        
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor")
+    }
+
+    "handle withPartitionAssignmentStrategyCooperativeStickyAssignor" in {
+      val conf = ConfigFactory
+        .parseString(
+          """
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = 
org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.value.deserializer = 
org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
+        """)
+        .withFallback(ConfigFactory.load())
+        .getConfig("pekko.kafka.consumer")
+      val settings = ConsumerSettings(conf, None, None)
+        .withPartitionAssignmentStrategyCooperativeStickyAssignor()
+      
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) 
should ===(
+        "org.apache.kafka.clients.consumer.CooperativeStickyAssignor")
+    }
+
     "filter passwords from kafka-clients properties" in {
       val conf = ConfigFactory.load().getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new 
StringDeserializer)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to