This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 68d87b3d feat: improve Scala 3.8 forward compatibility (#552)
68d87b3d is described below

commit 68d87b3d5cdfd4506e81dc8514b6467a20d7216a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Jun 15 05:12:08 2026 +0800

    feat: improve Scala 3.8 forward compatibility (#552)
    
    Motivation:
    Scala 3.8 introduces new warnings and deprecations that will become
    errors in future versions. Preparing the codebase for smooth upgrades.
    
    Modification:
    - Replace `private[this]` with `private` (10 locations, deprecated in 3.8)
    - Replace `= _` with `= null` (19 locations, deprecated in 3.8)
    - Add -Wconf suppressions for warnings requiring Scala 3-only syntax
    
    Result:
    Zero new warnings when compiled with Scala 3.8.4, while maintaining
    full cross-compilation with Scala 2.13 and 3.3.x.
---
 .../org/apache/pekko/kafka/benchmarks/CsvFormatter.scala  | 12 ++++++------
 .../pekko/kafka/internal/BaseSingleSourceLogic.scala      |  4 ++--
 .../apache/pekko/kafka/internal/DeferredProducer.scala    |  2 +-
 .../apache/pekko/kafka/internal/KafkaConsumerActor.scala  | 14 +++++++-------
 .../org/apache/pekko/kafka/internal/LoggingWithId.scala   |  4 ++--
 .../org/apache/pekko/kafka/internal/SubSourceLogic.scala  |  6 +++---
 project/ProjectSettings.scala                             | 15 ++++++++++++++-
 .../pekko/kafka/testkit/internal/KafkaTestKit.scala       |  2 +-
 .../kafka/testkit/internal/TestcontainersKafka.scala      |  4 ++--
 .../apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala   |  2 +-
 tests/src/test/scala/docs/scaladsl/proto/Order.scala      |  4 ++--
 11 files changed, 41 insertions(+), 28 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
index 1b2fbc4c..9a55e5ff 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
@@ -26,13 +26,13 @@ private[benchmarks] class CsvFormatter(delimiter: Char,
     quotingStyle: CsvQuotingStyle,
     charset: Charset = StandardCharsets.UTF_8) {
 
-  private[this] val charsetName = charset.name()
+  private val charsetName = charset.name()
 
-  private[this] val delimiterBs = ByteString(String.valueOf(delimiter), 
charsetName)
-  private[this] val quoteBs = ByteString(String.valueOf(quoteChar), 
charsetName)
-  private[this] val duplicatedQuote = 
ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
-  private[this] val duplicatedEscape = 
ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
-  private[this] val endOfLineBs = ByteString(endOfLine, charsetName)
+  private val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
+  private val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
+  private val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar, 
quoteChar)), charsetName)
+  private val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar, 
escapeChar)), charsetName)
+  private val endOfLineBs = ByteString(endOfLine, charsetName)
 
   def toCsv(fields: immutable.Iterable[Any]): ByteString =
     if (fields.nonEmpty) nonEmptyToCsv(fields)
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
 
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
index b963fc10..c892d0b2 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
@@ -43,8 +43,8 @@ import scala.concurrent.{ ExecutionContext, Future }
 
   override protected def executionContext: ExecutionContext = 
materializer.executionContext
   protected def consumerFuture: Future[ActorRef]
-  protected final var consumerActor: ActorRef = _
-  protected var sourceActor: StageActor = _
+  protected final var consumerActor: ActorRef = null
+  protected var sourceActor: StageActor = null
   protected var tps = Set.empty[TopicPartition]
   private var requested = false
   private var requestId = 0
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 6e44d72d..a47fb9ae 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -61,7 +61,7 @@ private[kafka] trait DeferredProducer[K, V] {
   import DeferredProducer._
 
   /** The Kafka producer may be created lazily, assigned via `preStart` in 
`assignProducer`. */
-  protected var producer: Producer[K, V] = _
+  protected var producer: Producer[K, V] = null
   protected var producerAssignmentLifecycle: ProducerAssignmentLifecycle = 
Unassigned
 
   protected def producerSettings: ProducerSettings[K, V]
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index 976ee1a3..05123265 100644
--- 
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++ 
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -196,23 +196,23 @@ import scala.util.control.NonFatal
   private val pollMsg = Poll(this, periodic = true)
   private val delayedPollMsg = Poll(this, periodic = false)
 
-  private var settings: ConsumerSettings[K, V] = _
-  private var pollTimeout: java.time.Duration = _
+  private var settings: ConsumerSettings[K, V] = null
+  private var pollTimeout: java.time.Duration = null
 
   /** Limits the blocking on offsetForTimes */
-  private var offsetForTimesTimeout: java.time.Duration = _
+  private var offsetForTimesTimeout: java.time.Duration = null
 
   /** Limits the blocking on position in [[RebalanceListenerImpl]] */
-  private var positionTimeout: java.time.Duration = _
+  private var positionTimeout: java.time.Duration = null
 
   private var requests = Map.empty[ActorRef, RequestMessages]
 
   /** ActorRefs of all stages that sent subscriptions requests or 
`RegisterSubStage` to this actor (removed on their termination). */
   private var stageActorsMap = Map.empty[Set[TopicPartition], ActorRef]
-  private var consumer: Consumer[K, V] = _
+  private var consumer: Consumer[K, V] = null
   private var commitsInProgress = 0
-  private var commitRefreshing: CommitRefreshing = _
-  private var resetProtection: ConsumerResetProtection = _
+  private var commitRefreshing: CommitRefreshing = null
+  private var resetProtection: ConsumerResetProtection = null
   private var stopInProgress = false
 
   /**
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
index 848f6259..0dac27f5 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
@@ -31,7 +31,7 @@ private[internal] trait InstanceId {
  * Override Apache Pekko streams [[StageLogging]] to include an ID from 
[[InstanceId]] as a prefix to each logging statement.
  */
 private[internal] trait StageIdLogging extends StageLogging with InstanceId { 
self: GraphStageLogic =>
-  private[this] var _log: LoggingAdapter = _
+  private var _log: LoggingAdapter = null
   protected def idLogPrefix: String = s"[$id] "
   override def log: LoggingAdapter = {
     if (_log eq null) {
@@ -45,7 +45,7 @@ private[internal] trait StageIdLogging extends StageLogging 
with InstanceId { se
  * Override Apache Pekko classic [[ActorLogging]] to include an ID from 
[[InstanceId]] as a prefix to each logging statement.
  */
 private[internal] trait ActorIdLogging extends ActorLogging with InstanceId { 
this: Actor =>
-  private[this] var _log: LoggingAdapter = _
+  private var _log: LoggingAdapter = null
   protected def idLogPrefix: String = s"[$id] "
   override def log: LoggingAdapter = {
     if (_log eq null) {
diff --git 
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala 
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index 211281e1..30e7487d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -70,8 +70,8 @@ private class SubSourceLogic[K, V, Msg](
   override def executionContext: ExecutionContext = 
materializer.executionContext
   override def consumerFuture: Future[ActorRef] = consumerPromise.future
 
-  protected var consumerActor: ActorRef = _
-  protected var sourceActor: StageActor = _
+  protected var consumerActor: ActorRef = null
+  protected var sourceActor: StageActor = null
 
   /** Kafka has notified us that we have these partitions assigned, but we 
have not created a source for them yet. */
   private var pendingPartitions: immutable.Set[TopicPartition] = 
immutable.Set.empty
@@ -413,7 +413,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
   override def id: String = s"${super.id}#$actorNumber"
   private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0, 
Set(tp))
   private var requested = false
-  protected var subSourceActor: StageActor = _
+  protected var subSourceActor: StageActor = null
 
   override def preStart(): Unit = {
     super.preStart()
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index 37d8e38e..1e9aed2e 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -94,7 +94,20 @@ object ProjectSettings extends AutoPlugin {
       else Seq.empty
     },
     scalacOptions ++= {
-      if (scalaBinaryVersion.value == "3") Seq("-Yfuture-lazy-vals", 
"-release:17")
+      if (scalaBinaryVersion.value == "3")
+        Seq(
+          "-Yfuture-lazy-vals",
+          "-release:17",
+          "-Wconf:msg=Implicit parameters should be provided with a `using` 
clause:s",
+          "-Wconf:msg=is deprecated for wildcard arguments of types:s",
+          "-Wconf:msg=The trailing ` _` for eta-expansion is unnecessary:s",
+          "-Wconf:msg=with as a type operator has been deprecated:s",
+          "-Wconf:msg=Unreachable case except for null:s",
+          "-Wconf:msg=is no longer supported for vararg splices:s",
+          "-Wconf:msg=is not declared infix:s",
+          "-Wconf:msg=auto insertion will be deprecated:s",
+          "-Wconf:msg=Invalid message filter:s",
+          "-Wconf:msg=bad option.*-Yfuture-lazy-vals:s")
       else Seq.empty
     },
     Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
index 6c0bc5b7..d152d1e4 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
@@ -100,7 +100,7 @@ trait KafkaTestKit {
     config
   }
 
-  private var adminClientVar: Admin = _
+  private var adminClientVar: Admin = null
 
   /**
    * Access to the Kafka Admin client
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
index a32e8b77..32dc338f 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
@@ -26,8 +26,8 @@ import scala.jdk.OptionConverters._
 
 object TestcontainersKafka {
   trait Spec extends KafkaSpec {
-    private var cluster: KafkaContainerCluster = _
-    private var kafkaBootstrapServersInternal: String = _
+    private var cluster: KafkaContainerCluster = null
+    private var kafkaBootstrapServersInternal: String = null
     private var kafkaPortInternal: Int = -1
 
     private def requireStarted(): Unit =
diff --git 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 65c3ea18..016dee93 100644
--- 
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++ 
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -59,7 +59,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: 
Int, actorSystem: A
   implicit val mat: Materializer = SystemMaterializer(system).materializer
   implicit val scheduler: pekko.actor.Scheduler = system.scheduler
 
-  var testProducer: KProducer[String, String] = _
+  var testProducer: KProducer[String, String] = null
 
   def setUp(): Unit = {
     testProducer = Await.result(producerDefaults.createKafkaProducerAsync(), 
2.seconds)
diff --git a/tests/src/test/scala/docs/scaladsl/proto/Order.scala 
b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
index 0c897461..c3d9495e 100644
--- a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
+++ b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
@@ -28,8 +28,8 @@ final case class Order(
     extends scalapb.GeneratedMessage
     with scalapb.lenses.Updatable[Order] {
   @transient
-  private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0
-  private[this] def __computeSerializedValue(): _root_.scala.Int = {
+  private var __serializedSizeCachedValue: _root_.scala.Int = 0
+  private def __computeSerializedValue(): _root_.scala.Int = {
     var __size = 0
 
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to