This is an automated email from the ASF dual-hosted git repository.
philippus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new e7f15e33 Allow simpler access to partition assignment strategies (#506)
e7f15e33 is described below
commit e7f15e339775f371ee6334e0aead3b29476304a6
Author: Philippus Baalman <[email protected]>
AuthorDate: Thu Apr 16 11:44:50 2026 +0200
Allow simpler access to partition assignment strategies (#506)
---
.../org/apache/pekko/kafka/ConsumerSettings.scala | 32 ++++++++++++++++++
docs/src/main/paradox/consumer.md | 3 ++
.../java/docs/javadsl/ConsumerSettingsTest.java | 18 ++++++++++
.../apache/pekko/kafka/ConsumerSettingsSpec.scala | 38 ++++++++++++++++++++++
4 files changed, 91 insertions(+)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index 89d64936..e2989c08 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -292,6 +292,38 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
def withGroupInstanceId(groupInstanceId: String): ConsumerSettings[K, V] =
withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
+ /**
+ * Scala API:
+ * A list of class names or class types, ordered by preference, of supported
+ * partition assignment strategies that the client will use to distribute
+ * partition ownership amongst consumer instances when group management is
used.
+ *
+ * See
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+ */
+ def withPartitionAssignmentStrategies(strategies: List[String]):
ConsumerSettings[K, V] =
+ withProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
strategies.mkString(","))
+
+ /**
+ * Java API:
+ * A list of class names or class types, ordered by preference, of supported
+ * partition assignment strategies that the client will use to distribute
+ * partition ownership amongst consumer instances when group management is
used.
+ *
+ * See
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+ */
+ def withPartitionAssignmentStrategies(strategies: Array[String]):
ConsumerSettings[K, V] =
+ withPartitionAssignmentStrategies(strategies.toList)
+
+ /**
+ * Sets the `CooperativeStickyAssignor` assignment strategy.
+ *
+ * @see
https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
+ * @see https://kafka.apache.org/33/documentation.html#upgrade_300_notable
+ */
+ def withPartitionAssignmentStrategyCooperativeStickyAssignor():
ConsumerSettings[K, V] =
+ withProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+
classOf[org.apache.kafka.clients.consumer.CooperativeStickyAssignor].getName)
+
/**
* Scala API:
* The raw properties of the kafka-clients driver, see constants in
diff --git a/docs/src/main/paradox/consumer.md
b/docs/src/main/paradox/consumer.md
index dbe72072..2a897fce 100644
--- a/docs/src/main/paradox/consumer.md
+++ b/docs/src/main/paradox/consumer.md
@@ -49,6 +49,7 @@ When creating a consumer source you need to pass in
@apidoc[ConsumerSettings] th
* bootstrap servers of the Kafka cluster (see @ref:[Service
discovery](discovery.md) to defer the server configuration)
* group id for the consumer, note that offsets are always committed for a
given consumer group
* Kafka consumer tuning parameters
+* assignment strategies
Apache Pekko Connectors Kafka's defaults for all settings are defined in
`reference.conf` which is included in the library JAR.
@@ -59,6 +60,8 @@ Important consumer settings
| kafka-clients | Section for properties passed unchanged to the Kafka client
(see @extref:[Kafka's Consumer
Configs](kafka:/documentation.html#consumerconfigs)) |
| connection-checker | Configuration to let the stream fail if the connection
to the Kafka broker fails. |
+Explicitly selecting a [Consumer Assignment
Strategy](https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy)
such as
@javadoc[CooperativeStickyAssignor](org.apache.kafka.clients.consumer.CooperativeStickyAssignor)
is recommended. They were introduced in [Kafka
3.0](https://kafka.apache.org/33/documentation.html#upgrade_300_notable).
Please check the [Kafka upgrade
guide](https://cwiki.apache.org/confluence/display/KAFKA/KIP-429:+Kafka+Consumer+Incremental+
[...]
+
reference.conf (HOCON)
: @@ snip [snip](/core/src/main/resources/reference.conf) { #consumer-settings
}
diff --git a/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
b/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
index 92d36bcb..fe71b5dc 100644
--- a/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
+++ b/java-tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
@@ -14,8 +14,11 @@
package docs.javadsl;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.kafka.ConsumerSettings;
@@ -46,4 +49,19 @@ public class ConsumerSettingsTest {
// #discovery-settings
TestKit.shutdownActorSystem(system);
}
+
+ @Test
+ public void setAssignor() throws Exception {
+ ActorSystem system = ActorSystem.create("ConsumerSettingsTest");
+ ConsumerSettings<String, String> settings =
+ ConsumerSettings.create(system, new StringDeserializer(), new
StringDeserializer())
+ .withPartitionAssignmentStrategies(
+ new String[] {
+
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName(),
+
org.apache.kafka.clients.consumer.StickyAssignor.class.getName()
+ });
+ assertEquals(
+
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor",
+
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+ }
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
index a037d8bb..ca3b472f 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
@@ -19,6 +19,7 @@ import pekko.actor.ActorSystem
import pekko.kafka.tests.scaladsl.LogCapturing
import pekko.testkit.TestKit
import com.typesafe.config.ConfigFactory
+import org.apache.kafka.clients.consumer.{ ConsumerConfig,
CooperativeStickyAssignor }
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer,
StringDeserializer }
import org.scalatest.OptionValues
@@ -104,6 +105,43 @@ class ConsumerSettingsSpec
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
}
+ "handle withPartitionAssignmentStrategies" in {
+ val conf = ConfigFactory
+ .parseString(
+ """
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer =
org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.value.deserializer =
org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
+ """)
+ .withFallback(ConfigFactory.load())
+ .getConfig("pekko.kafka.consumer")
+ val settings = ConsumerSettings(conf, None, None)
+ .withPartitionAssignmentStrategies(List(
+
classOf[org.apache.kafka.clients.consumer.CooperativeStickyAssignor].getName,
+ classOf[org.apache.kafka.clients.consumer.StickyAssignor].getName)
+ )
+
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)
should ===(
+
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor")
+ }
+
+ "handle withPartitionAssignmentStrategyCooperativeStickyAssignor" in {
+ val conf = ConfigFactory
+ .parseString(
+ """
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer =
org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.value.deserializer =
org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
+ """)
+ .withFallback(ConfigFactory.load())
+ .getConfig("pekko.kafka.consumer")
+ val settings = ConsumerSettings(conf, None, None)
+ .withPartitionAssignmentStrategyCooperativeStickyAssignor()
+
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)
should ===(
+ "org.apache.kafka.clients.consumer.CooperativeStickyAssignor")
+ }
+
"filter passwords from kafka-clients properties" in {
val conf = ConfigFactory.load().getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new
StringDeserializer)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]