This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new c78cbdc2 fix deprecation issues with kafka client calls (#535)
c78cbdc2 is described below

commit c78cbdc238ee8d1c332fc0d61494ecf2cda0548f
Author: PJ Fanning <[email protected]>
AuthorDate: Tue May 26 04:16:35 2026 +0100

    fix deprecation issues with kafka client calls (#535)
    
    * Upgrade kafka-clients to 4.2.0 and fix deprecated API usage
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors-kafka/sessions/773c1c10-19f7-4af6-b8a1-0d76d70e2ad0
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Address code review: improve error messages and ConsumerDummy group name
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-connectors-kafka/sessions/773c1c10-19f7-4af6-b8a1-0d76d70e2ad0
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update ProducerSpec.scala
    
    * Update KafkaTransactionBenchmarks.scala
    
    * Fix InvalidGroupIdException for assignment-based consumers without 
group.id
    
    consumer.groupMetadata() throws InvalidGroupIdException when no group.id
    is configured (e.g. assign-based consumers). Wrap in Try so that
    group metadata is only forwarded when available.
    
    * Avoid exception overhead: check group.id presence before calling 
groupMetadata()
    
    Instead of catching InvalidGroupIdException via Try, check whether
    ConsumerConfig.GROUP_ID_CONFIG is present in the consumer settings.
    This avoids creating an exception with a stacktrace on every poll
    for assignment-based consumers that have no group.id configured.
    
    * Fix ConsumerMock.verifyClosed to match new close(CloseOptions) API
    
    The old verification matched close(java.time.Duration) but we now call
    close(CloseOptions.timeout(duration)). Update verifyClosed to match any
    CloseOptions argument (CloseOptions does not override equals), and remove
    the now-unnecessary @nowarn annotation.
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../benchmarks/KafkaTransactionBenchmarks.scala    |  5 ++---
 .../org/apache/pekko/kafka/ConsumerMessage.scala   | 10 +++++----
 .../kafka/internal/BaseSingleSourceLogic.scala     |  3 +++
 .../pekko/kafka/internal/KafkaConsumerActor.scala  | 12 ++++++-----
 .../pekko/kafka/internal/MessageBuilder.scala      | 10 ++++++---
 .../pekko/kafka/internal/SingleSourceLogic.scala   |  2 +-
 .../pekko/kafka/internal/SubSourceLogic.scala      |  3 +++
 .../internal/TransactionalProducerStage.scala      |  6 ++----
 .../kafka/internal/TransactionalSources.scala      | 24 +++++++++++++++++++++-
 .../pekko/kafka/testkit/javadsl/BaseKafkaTest.java |  4 ++--
 .../pekko/kafka/testkit/scaladsl/KafkaSpec.scala   |  6 ++----
 .../pekko/kafka/internal/ConsumerDummy.scala       |  3 ++-
 .../apache/pekko/kafka/internal/ConsumerMock.scala |  4 +---
 .../apache/pekko/kafka/internal/ProducerSpec.scala | 24 +++++++++++++++++-----
 14 files changed, 80 insertions(+), 36 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
index e34c9fec..94d18090 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaTransactionBenchmarks.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, 
RecordMetadata }
 import org.apache.kafka.common.TopicPartition
 
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.tailrec
 import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
 
@@ -45,12 +45,11 @@ object KafkaTransactionBenchmarks extends LazyLogging {
     var accumulatedMsgCount = 0L
     var lastCommit = 0L
 
-    @nowarn("msg=deprecated")
     def doCommit(): Unit = {
       accumulatedMsgCount = 0
       val offsetMap = Map(new TopicPartition(fixture.sourceTopic, 0) -> new 
OffsetAndMetadata(lastProcessedOffset))
       logger.debug("Committing offset " + offsetMap.head._2.offset())
-      producer.sendOffsetsToTransaction(offsetMap.asJava, new 
ConsumerGroupMetadata(fixture.groupId))
+      producer.sendOffsetsToTransaction(offsetMap.asJava, 
consumer.groupMetadata())
       producer.commitTransaction()
     }
 
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala 
b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
index 4f19a5fc..3b6591ca 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerMessage.scala
@@ -21,7 +21,7 @@ import org.apache.pekko
 import pekko.Done
 import pekko.annotation.{ DoNotInherit, InternalApi }
 import pekko.kafka.internal.{ CommittableOffsetBatchImpl, CommittedMarker }
-import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, 
ConsumerRecord }
 import org.apache.kafka.common.TopicPartition
 
 import scala.concurrent.Future
@@ -134,15 +134,17 @@ object ConsumerMessage {
 
   private[kafka] object PartitionOffsetCommittedMarker {
     def apply(key: GroupTopicPartition, offset: Long,
-        committedMarker: CommittedMarker, fromPartitionedSource: Boolean): 
PartitionOffsetCommittedMarker =
-      new PartitionOffsetCommittedMarker(key, offset, committedMarker, 
fromPartitionedSource)
+        committedMarker: CommittedMarker, fromPartitionedSource: Boolean,
+        consumerGroupMetadata: ConsumerGroupMetadata): 
PartitionOffsetCommittedMarker =
+      new PartitionOffsetCommittedMarker(key, offset, committedMarker, 
fromPartitionedSource, consumerGroupMetadata)
   }
 
   @InternalApi private[kafka] final class PartitionOffsetCommittedMarker(
       override val key: GroupTopicPartition,
       override val offset: Long,
       private[kafka] val committedMarker: CommittedMarker,
-      private[kafka] val fromPartitionedSource: Boolean) extends 
PartitionOffset(key, offset)
+      private[kafka] val fromPartitionedSource: Boolean,
+      private[kafka] val consumerGroupMetadata: ConsumerGroupMetadata) extends 
PartitionOffset(key, offset)
 
   /**
    * groupId, topic, partition key for an offset position.
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
index c34c0136..b963fc10 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
@@ -76,6 +76,7 @@ import scala.concurrent.{ ExecutionContext, Future }
       // might be more than one in flight when we assign/revoke tps
       if (msg.requestId == requestId)
         requested = false
+      msg.groupMetadata.foreach(onGroupMetadata)
       buffer = buffer ++ msg.messages
       pump()
     case (_, Status.Failure(e)) =>
@@ -84,6 +85,8 @@ import scala.concurrent.{ ExecutionContext, Future }
       failStage(new ConsumerFailed())
   }
 
+  protected def onGroupMetadata(metadata: 
org.apache.kafka.clients.consumer.ConsumerGroupMetadata): Unit = ()
+
   protected def createConsumerActor(): ActorRef
 
   override protected def configureManualSubscription(subscription: 
ManualSubscription): Unit = subscription match {
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index c16ea42a..976ee1a3 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.errors.{
 }
 import org.apache.kafka.common.{ Metric, MetricName, TopicPartition }
 
-import scala.annotation.nowarn
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -87,7 +86,8 @@ import scala.util.control.NonFatal
     // responses
     final case class Assigned(partition: List[TopicPartition]) extends 
NoSerializationVerificationNeeded
     final case class Revoked(partition: List[TopicPartition]) extends 
NoSerializationVerificationNeeded
-    final case class Messages[K, V](requestId: Int, messages: 
Iterator[ConsumerRecord[K, V]])
+    final case class Messages[K, V](requestId: Int, messages: 
Iterator[ConsumerRecord[K, V]],
+        groupMetadata: Option[ConsumerGroupMetadata] = None)
         extends NoSerializationVerificationNeeded
     final case class ConsumerMetrics(metrics: Map[MetricName, Metric]) extends 
NoSerializationVerificationNeeded {
       def getMetrics: java.util.Map[MetricName, Metric] = metrics.asJava
@@ -449,7 +449,6 @@ import scala.util.control.NonFatal
     progressTracker
   }
 
-  @nowarn("msg=deprecated")
   override def postStop(): Unit = {
     // reply to outstanding requests is important if the actor is restarted
     requests.foreach {
@@ -457,7 +456,7 @@ import scala.util.control.NonFatal
         ref ! Messages(req.requestId, Iterator.empty)
     }
     partitionAssignmentHandler.postStop()
-    consumer.close(settings.getCloseTimeout)
+    consumer.close(CloseOptions.timeout(settings.getCloseTimeout))
     super.postStop()
   }
 
@@ -643,7 +642,10 @@ import scala.util.control.NonFatal
           }
           val messages = b.result().iterator
           if (messages.nonEmpty) {
-            stageActorRef ! Messages(req.requestId, messages)
+            val groupMetadata =
+              if 
(settings.properties.contains(ConsumerConfig.GROUP_ID_CONFIG)) 
Some(consumer.groupMetadata())
+              else None
+            stageActorRef ! Messages(req.requestId, messages, groupMetadata)
             requests -= stageActorRef
           }
       }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
index f985faac..099f4c04 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/MessageBuilder.scala
@@ -27,7 +27,7 @@ import pekko.kafka.ConsumerMessage.{
   TransactionalMessage,
   _
 }
-import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata }
+import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, 
ConsumerRecord, OffsetAndMetadata }
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.OffsetFetchResponse
 
@@ -57,6 +57,8 @@ private[kafka] trait TransactionalMessageBuilderBase[K, V, 
Msg] extends MessageB
   def onMessage(consumerMessage: ConsumerRecord[K, V]): Unit
 
   def fromPartitionedSource: Boolean
+
+  def consumerGroupMetadata: ConsumerGroupMetadata
 }
 
 /** Internal API */
@@ -72,7 +74,8 @@ private[kafka] trait TransactionalMessageBuilder[K, V]
         partition = rec.partition),
       offset = rec.offset,
       committedMarker,
-      fromPartitionedSource)
+      fromPartitionedSource,
+      consumerGroupMetadata)
     ConsumerMessage.TransactionalMessage(rec, offset)
   }
 }
@@ -90,7 +93,8 @@ private[kafka] trait TransactionalOffsetContextBuilder[K, V]
         partition = rec.partition),
       offset = rec.offset,
       committedMarker,
-      fromPartitionedSource)
+      fromPartitionedSource,
+      consumerGroupMetadata)
     (rec, offset)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
index c7726ca7..b8cef780 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/SingleSourceLogic.scala
@@ -62,7 +62,7 @@ import scala.concurrent.{ Future, Promise }
       complete(shape.out)
     }
     sourceActor.become(shuttingDownReceive.orElse {
-      case (_, Messages(requestId, messages)) =>
+      case (_, Messages(requestId, messages, _)) =>
         // Prevent stage failure during shutdown by ignoring Messages
         if (messages.hasNext)
           log.debug("Unexpected `Messages` received with requestId={} and a 
non-empty message iterator: {}",
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index 4aada7fa..211281e1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -429,6 +429,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
   protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
     case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V 
@unchecked]) =>
       requested = false
+      msg.groupMetadata.foreach(onGroupMetadata)
       buffer = buffer ++ msg.messages
       pump()
     case (_, Status.Failure(e)) =>
@@ -437,6 +438,8 @@ private abstract class SubSourceStageLogic[K, V, Msg](
       failStage(new ConsumerFailed)
   }
 
+  protected def onGroupMetadata(metadata: 
org.apache.kafka.clients.consumer.ConsumerGroupMetadata): Unit = ()
+
   protected def onDownstreamFinishSubSourceCancellationStrategy(): 
SubSourceCancellationStrategy =
     if (buffer.hasNext) {
       SeekToOffsetAndReEmit(buffer.next().offset())
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
index 9c4b5d2c..435717e0 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.{ 
ConsumerGroupMetadata, OffsetAndMetad
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 
-import scala.annotation.nowarn
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -76,6 +75,7 @@ private object TransactionalProducerStage {
 
     def group: String = head.key.groupId
     def committedMarker: CommittedMarker = head.committedMarker
+    def groupMetadata: ConsumerGroupMetadata = head.consumerGroupMetadata
 
     def offsetMap(): Map[TopicPartition, OffsetAndMetadata] = offsets.map {
       case (gtp, offset) => new TopicPartition(gtp.topic, gtp.partition) -> 
new OffsetAndMetadata(offset + 1)
@@ -241,7 +241,6 @@ private final class TransactionalProducerStageLogic[K, V, 
P](
     super.onCompletionFailure(ex)
   }
 
-  @nowarn("msg=deprecated")
   private def commitTransaction(batch: NonemptyTransactionBatch, 
beginNewTransaction: Boolean): Unit = {
     val group = batch.group
     log.debug("Committing transaction for transactional id '{}' consumer group 
'{}' with offsets: {}",
@@ -249,8 +248,7 @@ private final class TransactionalProducerStageLogic[K, V, 
P](
       group,
       batch.offsets)
     val offsetMap = batch.offsetMap()
-    // ConsumerGroupMetadata constructor is deprecated
-    producer.sendOffsetsToTransaction(offsetMap.asJava, new 
ConsumerGroupMetadata(group))
+    producer.sendOffsetsToTransaction(offsetMap.asJava, batch.groupMetadata)
     producer.commitTransaction()
     log.debug("Committed transaction for transactional id '{}' consumer group 
'{}' with offsets: {}",
       transactionalId,
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
index 718fc8f7..e712ff98 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala
@@ -32,7 +32,7 @@ import pekko.stream.SourceShape
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ AsyncCallback, GraphStageLogic }
 import pekko.util.Timeout
-import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
OffsetAndMetadata }
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, 
ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
 import org.apache.kafka.common.{ IsolationLevel, TopicPartition }
 
 import scala.concurrent.duration.FiniteDuration
@@ -142,6 +142,17 @@ private[internal] abstract class 
TransactionalSourceLogic[K, V, Msg](shape: Sour
 
   override val groupId: String = 
consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
 
+  private var currentGroupMetadata: Option[ConsumerGroupMetadata] = None
+
+  override protected def onGroupMetadata(metadata: ConsumerGroupMetadata): 
Unit =
+    currentGroupMetadata = Some(metadata)
+
+  override def consumerGroupMetadata: ConsumerGroupMetadata =
+    currentGroupMetadata.getOrElse(
+      throw new IllegalStateException(
+        s"Consumer group metadata not yet available for group $groupId. " +
+        "Metadata is populated on the first poll delivering messages from the 
consumer."))
+
   override lazy val committedMarker: CommittedMarker = {
     val ec = materializer.executionContext
     CommittedMarkerRef(sourceActor.ref, consumerSettings.commitTimeout)(ec)
@@ -369,6 +380,17 @@ private final class TransactionalSubSourceStageLogic[K, V](
 
   override def groupId: String = 
consumerSettings.properties(ConsumerConfig.GROUP_ID_CONFIG)
 
+  private var currentGroupMetadata: Option[ConsumerGroupMetadata] = None
+
+  override protected def onGroupMetadata(metadata: ConsumerGroupMetadata): 
Unit =
+    currentGroupMetadata = Some(metadata)
+
+  override def consumerGroupMetadata: ConsumerGroupMetadata =
+    currentGroupMetadata.getOrElse(
+      throw new IllegalStateException(
+        s"Consumer group metadata not yet available for group $groupId. " +
+        "Metadata is populated on the first poll delivering messages from the 
consumer."))
+
   override def onMessage(rec: ConsumerRecord[K, V]): Unit =
     inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) 
-> rec.offset()))
 
diff --git 
a/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
 
b/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
index 031e7f11..2512bc80 100644
--- 
a/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
+++ 
b/testkit/src/main/java/org/apache/pekko/kafka/testkit/javadsl/BaseKafkaTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.admin.DescribeClusterResult;
 import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.pekko.Done;
@@ -153,7 +153,7 @@ public abstract class BaseKafkaTest extends 
KafkaTestKitClass {
         groupId,
         group -> {
           try {
-            return group.state() == ConsumerGroupState.STABLE && 
predicate.test(group.members());
+            return group.groupState() == GroupState.STABLE && 
predicate.test(group.members());
           } catch (Exception ex) {
             return false;
           }
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 97a52767..65c3ea18 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -33,10 +33,9 @@ import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestKit
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.{ Producer => KProducer, 
ProducerRecord }
-import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.GroupState
 import org.slf4j.{ Logger, LoggerFactory }
 
-import scala.annotation.nowarn
 import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -120,10 +119,9 @@ abstract class KafkaSpec(_kafkaPort: Int, val 
zooKeeperPort: Int, actorSystem: A
    *
    * If the predicate does not hold after configured amount of time, throws an 
exception.
    */
-  @nowarn("cat=deprecation")
   def waitUntilConsumerSummary(groupId: String)(predicate: 
PartialFunction[List[MemberDescription], Boolean]): Unit =
     waitUntilConsumerGroup(groupId) { group =>
-      group.state() == ConsumerGroupState.STABLE &&
+      group.groupState() == GroupState.STABLE &&
       Try(predicate(group.members().asScala.toList)).getOrElse(false)
     }
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
index 9d38fa7f..aa96dfb3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala
@@ -98,7 +98,8 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
   override def endOffsets(partitions: java.util.Collection[TopicPartition],
       timeout: java.time.Duration): java.util.Map[TopicPartition, 
java.lang.Long] = ???
   override def poll(timeout: java.time.Duration): ConsumerRecords[K, V] = ???
-  override def groupMetadata(): ConsumerGroupMetadata = ???
+  @scala.annotation.nowarn("msg=deprecated")
+  override def groupMetadata(): ConsumerGroupMetadata = new 
ConsumerGroupMetadata("dummy-consumer-group")
   override def enforceRebalance(): Unit = ???
   override def currentLag(partition: TopicPartition): java.util.OptionalLong = 
???
 
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
index 05fa441e..99c9c855 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala
@@ -26,7 +26,6 @@ import org.mockito.stubbing.Answer
 import org.mockito.verification.VerificationMode
 import org.mockito.{ ArgumentMatchers, Mockito }
 
-import scala.annotation.nowarn
 import scala.collection.immutable.Seq
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -169,9 +168,8 @@ class ConsumerMock[K, V](handler: 
ConsumerMock.CommitHandler = new ConsumerMock.
       responses :+= records
     }
 
-  @nowarn("msg=deprecated")
   def verifyClosed(mode: VerificationMode = Mockito.times(1)) =
-    verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
+    verify(mock, mode).close(ArgumentMatchers.any[CloseOptions])
 
   def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()) =
     verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])
diff --git 
a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala 
b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 32850718..53cbe2d1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -50,6 +50,12 @@ import scala.concurrent.{ Await, ExecutionContext, Future, 
Promise }
 import scala.jdk.CollectionConverters._
 import scala.util.{ Failure, Success, Try }
 
+private[internal] object ProducerSpec {
+  val group = "group"
+  @scala.annotation.nowarn("msg=deprecated")
+  val testGroupMetadata = new ConsumerGroupMetadata(group)
+}
+
 class ProducerSpec(_system: ActorSystem)
     extends TestKit(_system)
     with AnyFlatSpecLike
@@ -67,8 +73,7 @@ class ProducerSpec(_system: ActorSystem)
   override def afterAll(): Unit = shutdown(system)
 
   implicit val ec: ExecutionContext = _system.dispatcher
-
-  private val group = "group"
+  import ProducerSpec._
 
   type K = String
   type V = String
@@ -88,7 +93,8 @@ class ProducerSpec(_system: ActorSystem)
       PartitionOffsetCommittedMarker(consumerMessage.key,
         consumerMessage.offset,
         committer,
-        fromPartitionedSource = false)
+        fromPartitionedSource = false,
+        testGroupMetadata)
     ProducerMessage.Message(
       tuple._1,
       partitionOffsetCommittedMarker)
@@ -639,7 +645,11 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, 
V])(implicit ec: Execu
   def verifyTxCommit(po: ConsumerMessage.PartitionOffset) = {
     val inOrder = Mockito.inOrder(mock)
     val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> 
new OffsetAndMetadata(po.offset + 1)).asJava
-    inOrder.verify(mock).sendOffsetsToTransaction(offsets, new 
ConsumerGroupMetadata(po.key.groupId))
+    val expectedMeta = po match {
+      case m: ConsumerMessage.PartitionOffsetCommittedMarker => 
m.consumerGroupMetadata
+      case _                                                 => 
ProducerSpec.testGroupMetadata
+    }
+    inOrder.verify(mock).sendOffsetsToTransaction(offsets, expectedMeta)
     inOrder.verify(mock).commitTransaction()
     inOrder.verify(mock).beginTransaction()
   }
@@ -648,7 +658,11 @@ class ProducerMock[K, V](handler: ProducerMock.Handler[K, 
V])(implicit ec: Execu
   def verifyTxCommitWhenShutdown(po: ConsumerMessage.PartitionOffset) = {
     val inOrder = Mockito.inOrder(mock)
     val offsets = Map(new TopicPartition(po.key.topic, po.key.partition) -> 
new OffsetAndMetadata(po.offset + 1)).asJava
-    inOrder.verify(mock).sendOffsetsToTransaction(offsets, new 
ConsumerGroupMetadata(po.key.groupId))
+    val expectedMeta = po match {
+      case m: ConsumerMessage.PartitionOffsetCommittedMarker => 
m.consumerGroupMetadata
+      case _                                                 => 
ProducerSpec.testGroupMetadata
+    }
+    inOrder.verify(mock).sendOffsetsToTransaction(offsets, expectedMeta)
     inOrder.verify(mock).commitTransaction()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to