This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new c78cbdc2 fix deprecation issues with kafka client calls (#535)
c78cbdc2 is described below
commit c78cbdc238ee8d1c332fc0d61494ecf2cda0548f
Author: PJ Fanning <[email protected]>
AuthorDate: Tue May 26 04:16:35 2026 +0100
fix deprecation issues with kafka client calls (#535)
* Upgrade kafka-clients to 4.2.0 and fix deprecated API usage
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors-kafka/sessions/773c1c10-19f7-4af6-b8a1-0d76d70e2ad0
Co-authored-by: pjfanning <[email protected]>
* Address code review: improve error messages and ConsumerDummy group name
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors-kafka/sessions/773c1c10-19f7-4af6-b8a1-0d76d70e2ad0
Co-authored-by: pjfanning <[email protected]>
* Update ProducerSpec.scala
* Update KafkaTransactionBenchmarks.scala
* Fix InvalidGroupIdException for assignment-based consumers without
group.id
consumer.groupMetadata() throws InvalidGroupIdException when no group.id
is configured (e.g. assign-based consumers). Wrap in Try so that
group metadata is only forwarded when available.
* Avoid exception overhead: check group.id presence before calling
groupMetadata()
Instead of catching InvalidGroupIdException via Try, check whether
ConsumerConfig.GROUP_ID_CONFIG is present in the consumer settings.
This avoids creating an exception with a stacktrace on every poll
for assignment-based consumers that have no group.id configured.
* Fix ConsumerMock.verifyClosed to match new close(CloseOptions) API
The old verification matched close(java.time.Duration) but we now call
close(CloseOptions.timeout(duration)). Update verifyClosed to match any
CloseOptions argument (CloseOptions does not override equals), and remove
the now-unnecessary @nowarn annotation.
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../benchmarks/KafkaTransactionBenchmarks.scala | 5 ++---
.../org/apache/pekko/kafka/ConsumerMessage.scala | 10 +++++----
.../kafka/internal/BaseSingleSourceLogic.scala | 3 +++
.../pekko/kafka/internal/KafkaConsumerActor.scala | 12 ++++++-----
.../pekko/kafka/internal/MessageBuilder.scala | 10 ++++++---
.../pekko/kafka/internal/SingleSourceLogic.scala | 2 +-
.../pekko/kafka/internal/SubSourceLogic.scala | 3 +++
.../internal/TransactionalProducerStage.scala | 6 ++----
.../kafka/internal/TransactionalSources.scala | 24 +++++++++++++++++++++-
.../pekko/kafka/testkit/javadsl/BaseKafkaTest.java | 4 ++--
.../pekko/kafka/testkit/scaladsl/KafkaSpec.scala | 6 ++----
.../pekko/kafka/internal/ConsumerDummy.scala | 3 ++-
.../apache/pekko/kafka/internal/ConsumerMock.scala | 4 +---
.../apache/pekko/kafka/internal/ProducerSpec.scala | 24 +++++++++++++++++-----
14 files changed, 80 insertions(+), 36 deletions(-)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index e34c9fec..94d18090 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ Callback, ProducerRecord,
RecordMetadata }
import org.apache.kafka.common.TopicPartition
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
@@ -45,12 +45,11 @@ object KafkaTransactionBenchmarks extends LazyLogging {
var accumulatedMsgCount = 0L
var lastCommit = 0L
- @nowarn("msg=deprecated")
def doCommit(): Unit = {
accumulatedMsgCount = 0
val offsetMap = Map(new TopicPartition(fixture.sourceTopic, 0) -> new
OffsetAndMetadata(lastProcessedOffset))
logger.debug("Committing offset " + offsetMap.head._2.offset())
- producer.sendOffsetsToTransaction(offsetMap.asJava, new
ConsumerGroupMetadata(fixture.groupId))
+ producer.sendOffsetsToTransaction(offsetMap.asJava,
consumer.groupMetadata())
producer.commitTransaction()
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 4f19a5fc..3b6591ca 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -21,7 +21,7 @@ import org.apache.pekko
import pekko.Done
import pekko.annotation.{ DoNotInherit, InternalApi }
import pekko.kafka.internal.{ CommittableOffsetBatchImpl, CommittedMarker }
-import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata,
ConsumerRecord }
import org.apache.kafka.common.TopicPartition
import scala.concurrent.Future
@@ -134,15 +134,17 @@ object ConsumerMessage {
private[kafka] object PartitionOffsetCommittedMarker {
def apply(key: GroupTopicPartition, offset: Long,
- committedMarker: CommittedMarker, fromPartitionedSource: Boolean):
PartitionOffsetCommittedMarker =
- new PartitionOffsetCommittedMarker(key, offset, committedMarker,
fromPartitionedSource)
+ committedMarker: CommittedMarker, fromPartitionedSource: Boolean,
+ consumerGroupMetadata: ConsumerGroupMetadata):
PartitionOffsetCommittedMarker =
+ new PartitionOffsetCommittedMarker(key, offset, committedMarker,
fromPartitionedSource, consumerGroupMetadata)
}
@InternalApi private[kafka] final class PartitionOffsetCommittedMarker(
override val key: GroupTopicPartition,
override val offset: Long,
private[kafka] val committedMarker: CommittedMarker,
- private[kafka] val fromPartitionedSource: Boolean) extends
PartitionOffset(key, offset)
+ private[kafka] val fromPartitionedSource: Boolean,
+ private[kafka] val consumerGroupMetadata: ConsumerGroupMetadata) extends
PartitionOffset(key, offset)
/**
* groupId, topic, partition key for an offset position.
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
index c34c0136..b963fc10 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
@@ -76,6 +76,7 @@ import scala.concurrent.{ ExecutionContext, Future }
// might be more than one in flight when we assign/revoke tps
if (msg.requestId == requestId)
requested = false
+ msg.groupMetadata.foreach(onGroupMetadata)
buffer = buffer ++ msg.messages
pump()
case (_, Status.Failure(e)) =>
@@ -84,6 +85,8 @@ import scala.concurrent.{ ExecutionContext, Future }
failStage(new ConsumerFailed())
}
+ protected def onGroupMetadata(metadata:
org.apache.kafka.clients.consumer.ConsumerGroupMetadata): Unit = ()
+
protected def createConsumerActor(): ActorRef
override protected def configureManualSubscription(subscription:
ManualSubscription): Unit = subscription match {
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index c16ea42a..976ee1a3 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.errors.{
}
import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
-import scala.annotation.nowarn
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -87,7 +86,8 @@ import scala.util.control.NonFatal
// responses
final case class Assigned(partition: List[TopicPartition]) extends
NoSerializationVerificationNeeded
final case class Revoked(partition: List[TopicPartition]) extends
NoSerializationVerificationNeeded
- final case class Messages[K, V](requestId: Int, messages:
Iterator[ConsumerRecord[K, V]])
+ final case class Messages[K, V](requestId: Int, messages:
Iterator[ConsumerRecord[K, V]],
+ groupMetadata: Option[ConsumerGroupMetadata] = None)
extends NoSerializationVerificationNeeded
final case class ConsumerMetrics(metrics: Map[MetricName, Metric]) extends
NoSerializationVerificationNeeded {
def getMetrics: java.util.Map[MetricName, Metric] = metrics.asJava
@@ -449,7 +449,6 @@ import scala.util.control.NonFatal
progressTracker
}
- @nowarn("msg=deprecated")
override def postStop(): Unit = {
// reply to outstanding requests is important if the actor is restarted
requests.foreach {
@@ -457,7 +456,7 @@ import scala.util.control.NonFatal
ref ! Messages(req.requestId, Iterator.empty)
}
partitionAssignmentHandler.postStop()
- consumer.close(settings.getCloseTimeout)
+ consumer.close(CloseOptions.timeout(settings.getCloseTimeout))
super.postStop()
}
@@ -643,7 +642,10 @@ import scala.util.control.NonFatal
}
val messages = b.result().iterator
if (messages.nonEmpty) {
- stageActorRef ! Messages(req.requestId, messages)
+ val groupMetadata =
+ if
(settings.properties.contains(ConsumerConfig.GROUP_ID_CONFIG))
Some(consumer.groupMetadata())
+ else None
+ stageActorRef ! Messages(req.requestId, messages, groupMetadata)
requests -= stageActorRef
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index f985faac..099f4c04 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -27,7 +27,7 @@ import pekko.kafka.ConsumerMessage.{
TransactionalMessage,
_
}
-import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata }
+import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata,
ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -57,6 +57,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V,
Msg] extends MessageB
def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit
def fromPartitionedSource: Boolean
+
+ def consumerGroupMetadata: ConsumerGroupMetadata
}
/** Internal API */
@@ -72,7 +74,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
partition = rec.partition),
offset = rec.offset,
committedMarker,
- fromPartitionedSource)
+ fromPartitionedSource,
+ consumerGroupMetadata)
ConsumerMessage.TransactionalMessage(rec, offset)
}
}
@@ -90,7 +93,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
partition = rec.partition),
offset = rec.offset,
committedMarker,
- fromPartitionedSource)
+ fromPartitionedSource,
+ consumerGroupMetadata)
(rec, offset)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
index c7726ca7..b8cef780 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
@@ -62,7 +62,7 @@ import scala.concurrent.{ Future, Promise }
complete(shape.out)
}
sourceActor.become(shuttingDownReceive.orElse {
- case (_, Messages(requestId, messages)) =>
+ case (_, Messages(requestId, messages, _)) =>
// Prevent stage failure during shutdown by ignoring Messages
if (messages.hasNext)
log.debug("Unexpected `Messages` received with requestId={} and a
non-empty message iterator: {}",
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index 4aada7fa..211281e1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -429,6 +429,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V
@unchecked]) =>
requested = false
+ msg.groupMetadata.foreach(onGroupMetadata)
buffer = buffer ++ msg.messages
pump()
case (_, Status.Failure(e)) =>
@@ -437,6 +438,8 @@ private abstract class SubSourceStageLogic[K, V, Msg](
failStage(new ConsumerFailed)
}
+ protected def onGroupMetadata(metadata:
org.apache.kafka.clients.consumer.ConsumerGroupMetadata): Unit = ()
+
protected def onDownstreamFinishSubSourceCancellationStrategy():
SubSourceCancellationStrategy =
if (buffer.hasNext) {
SeekToOffsetAndReEmit(buffer.next().offset())
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 9c4b5d2c..435717e0 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.{
ConsumerGroupMetadata, OffsetAndMetad
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
-import scala.annotation.nowarn
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -76,6 +75,7 @@ private object TransactionalProducerStage {
def group: String = head.key.groupId
def committedMarker: CommittedMarker = head.committedMarker
+ def groupMetadata: ConsumerGroupMetadata = head.consumerGroupMetadata
def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map {
case (gtp, offset) => new TopicPartition(gtp.topic, gtp.partition) ->
new OffsetAndMetadata(offset + 1)
@@ -241,7 +241,6 @@ private final class TransactionalProducerStageLogic[K, V,
P](
super.onCompletionFailure(ex)
}
- @nowarn("msg=deprecated")
private def commitTransaction(batch: NonemptyTransactionBatch,
beginNewTransaction: Boolean): Unit = {
val group = batch.group
log.debug("Committing transaction for transactional id '{}' consumer group
'{}' with offsets: {}",
@@ -249,8 +248,7 @@ private final class TransactionalProducerStageLogic[K, V,
P](
group,
batch.offsets)
val offsetMap = batch.offsetMap()
- // ConsumerGroupMetadata constructor is deprecated
- producer.sendOffsetsToTransaction(offsetMap.asJava, new
ConsumerGroupMetadata(group))
+ producer.sendOffsetsToTransaction(offsetMap.asJava, batch.groupMetadata)
producer.commitTransaction()
log.debug("Committed transaction for transactional id '{}' consumer group
'{}' with offsets: {}",
transactionalId,
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
index 718fc8f7..e712ff98 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
@@ -32,7 +32,7 @@ import pekko.stream.SourceShape
import pekko.stream.scaladsl.Source
import pekko.stream.stage.{ AsyncCallback, GraphStageLogic }
import pekko.util.Timeout
-import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord,
OffsetAndMetadata }
+import org.apache.kafka.clients.consumer.{ ConsumerConfig,
ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.{ IsolationLevel, TopicPartition }
import scala.concurrent.duration.FiniteDuration
@@ -142,6 +142,17 @@ private[internal] abstract class
TransactionalSourceLogic[K, V, Msg](shape: Sour
override val groupId: String =
consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
+ private var currentGroupMetadata: Option[ConsumerGroupMetadata] = None
+
+ override protected def onGroupMetadata(metadata: ConsumerGroupMetadata):
Unit =
+ currentGroupMetadata = Some(metadata)
+
+ override def consumerGroupMetadata: ConsumerGroupMetadata =
+ currentGroupMetadata.getOrElse(
+ throw new IllegalStateException(
+ s"Consumer group metadata not yet available for group $groupId. " +
+ "Metadata is populated on the first poll delivering messages from the
consumer."))
+
override lazy val committedMarker: CommittedMarker = {
val ec = materializer.executionContext
CommittedMarkerRef(sourceActor.ref, consumerSettings.commitTimeout)(ec)
@@ -369,6 +380,17 @@ private final class TransactionalSubSourceStageLogic[K, V](
override def groupId: String =
consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
+ private var currentGroupMetadata: Option[ConsumerGroupMetadata] = None
+
+ override protected def onGroupMetadata(metadata: ConsumerGroupMetadata):
Unit =
+ currentGroupMetadata = Some(metadata)
+
+ override def consumerGroupMetadata: ConsumerGroupMetadata =
+ currentGroupMetadata.getOrElse(
+ throw new IllegalStateException(
+ s"Consumer group metadata not yet available for group $groupId. " +
+ "Metadata is populated on the first poll delivering messages from the
consumer."))
+
override def onMessage(rec: ConsumerRecord[K, V]): Unit =
inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition())
-> rec.offset()))
diff --git
a/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
b/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
index 031e7f11..2512bc80 100644
---
a/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
+++
b/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.pekko.Done;
@@ -153,7 +153,7 @@ public abstract class BaseKafkaTest extends
KafkaTestKitClass {
groupId,
group -> {
try {
- return group.state() == ConsumerGroupState.STABLE &&
predicate.test(group.members());
+ return group.groupState() == GroupState.STABLE &&
predicate.test(group.members());
} catch (Exception ex) {
return false;
}
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 97a52767..65c3ea18 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -33,10 +33,9 @@ import pekko.stream.testkit.scaladsl.TestSink
import pekko.testkit.TestKit
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{ Producer => KProducer,
ProducerRecord }
-import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.GroupState
import org.slf4j.{ Logger, LoggerFactory }
-import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -120,10 +119,9 @@ abstract class KafkaSpec(_kafkaPort: Int, val
zooKeeperPort: Int, actorSystem: A
*
* If the predicate does not hold after configured amount of time, throws an
exception.
*/
- @nowarn("cat=deprecation")
def waitUntilConsumerSummary(groupId: String)(predicate:
PartialFunction[List[MemberDescription], Boolean]): Unit =
waitUntilConsumerGroup(groupId) { group =>
- group.state() == ConsumerGroupState.STABLE &&
+ group.groupState() == GroupState.STABLE &&
Try(predicate(group.members().asScala.toList)).getOrElse(false)
}
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
index 9d38fa7f..aa96dfb3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
@@ -98,7 +98,8 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def endOffsets(partitions: java.util.Collection[TopicPartition],
timeout: java.time.Duration): java.util.Map[TopicPartition,
java.lang.Long] = ???
override def poll(timeout: java.time.Duration): ConsumerRecords[K, V] = ???
- override def groupMetadata(): ConsumerGroupMetadata = ???
+ @scala.annotation.nowarn("msg=deprecated")
+ override def groupMetadata(): ConsumerGroupMetadata = new
ConsumerGroupMetadata("dummy-consumer-group")
override def enforceRebalance(): Unit = ???
override def currentLag(partition: TopicPartition): java.util.OptionalLong =
???
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index 05fa441e..99c9c855 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -26,7 +26,6 @@ import org.mockito.stubbing.Answer
import org.mockito.verification.VerificationMode
import org.mockito.{ ArgumentMatchers, Mockito }
-import scala.annotation.nowarn
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
@@ -169,9 +168,8 @@ class ConsumerMock[K, V](handler:
ConsumerMock.CommitHandler = new ConsumerMock.
responses :+= records
}
- @nowarn("msg=deprecated")
def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
- verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
+ verify(mock, mode).close(ArgumentMatchers.any[CloseOptions])
def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()) =
verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])
diff --git
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 32850718..53cbe2d1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -50,6 +50,12 @@ import scala.concurrent.{ Await, ExecutionContext, Future,
Promise }
import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success, Try }
+private[internal] object ProducerSpec {
+ val group = "group"
+ @scala.annotation.nowarn("msg=deprecated")
+ val testGroupMetadata = new ConsumerGroupMetadata(group)
+}
+
class ProducerSpec(_system: ActorSystem)
extends TestKit(_system)
with AnyFlatSpecLike
@@ -67,8 +73,7 @@ class ProducerSpec(_system: ActorSystem)
override def afterAll(): Unit = shutdown(system)
implicit val ec: ExecutionContext = _system.dispatcher
-
- private val group = "group"
+ import ProducerSpec._
type K = String
type V = String
@@ -88,7 +93,8 @@ class ProducerSpec(_system: ActorSystem)
PartitionOffsetCommittedMarker(consumerMessage.key,
consumerMessage.offset,
committer,
- fromPartitionedSource = false)
+ fromPartitionedSource = false,
+ testGroupMetadata)
ProducerMessage.Message(
tuple._1,
partitionOffsetCommittedMarker)
@@ -639,7 +645,11 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K,
V])(implicit ec: Execu
def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = {
val inOrder = Mockito.inOrder(mock)
val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) ->
new OffsetAndMetadata(po.offset + 1)).asJava
- inOrder.verify(mock).sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(po.key.groupId))
+ val expectedMeta = po match {
+ case m: ConsumerMessage.PartitionOffsetCommittedMarker =>
m.consumerGroupMetadata
+ case _ =>
ProducerSpec.testGroupMetadata
+ }
+ inOrder.verify(mock).sendOffsetsToTransaction(offsets, expectedMeta)
inOrder.verify(mock).commitTransaction()
inOrder.verify(mock).beginTransaction()
}
@@ -648,7 +658,11 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K,
V])(implicit ec: Execu
def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = {
val inOrder = Mockito.inOrder(mock)
val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) ->
new OffsetAndMetadata(po.offset + 1)).asJava
- inOrder.verify(mock).sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(po.key.groupId))
+ val expectedMeta = po match {
+ case m: ConsumerMessage.PartitionOffsetCommittedMarker =>
m.consumerGroupMetadata
+ case _ =>
ProducerSpec.testGroupMetadata
+ }
+ inOrder.verify(mock).sendOffsetsToTransaction(offsets, expectedMeta)
inOrder.verify(mock).commitTransaction()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]