This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 68d87b3d feat: improve Scala 3.8 forward compatibility (#552)
68d87b3d is described below
commit 68d87b3d5cdfd4506e81dc8514b6467a20d7216a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Jun 15 05:12:08 2026 +0800
feat: improve Scala 3.8 forward compatibility (#552)
Motivation:
Scala 3.8 introduces new warnings and deprecations that will become
errors in future versions. Preparing the codebase for smooth upgrades.
Modification:
- Replace `private[this]` with `private` (10 locations, deprecated in 3.8)
- Replace `= _` with `= null` (19 locations, deprecated in 3.8)
- Add -Wconf suppressions for warnings requiring Scala 3-only syntax
Result:
Zero new warnings when compiled with Scala 3.8.4, while maintaining
full cross-compilation with Scala 2.13 and 3.3.x.
---
.../org/apache/pekko/kafka/benchmarks/CsvFormatter.scala | 12 ++++++------
.../pekko/kafka/internal/BaseSingleSourceLogic.scala | 4 ++--
.../apache/pekko/kafka/internal/DeferredProducer.scala | 2 +-
.../apache/pekko/kafka/internal/KafkaConsumerActor.scala | 14 +++++++-------
.../org/apache/pekko/kafka/internal/LoggingWithId.scala | 4 ++--
.../org/apache/pekko/kafka/internal/SubSourceLogic.scala | 6 +++---
project/ProjectSettings.scala | 15 ++++++++++++++-
.../pekko/kafka/testkit/internal/KafkaTestKit.scala | 2 +-
.../kafka/testkit/internal/TestcontainersKafka.scala | 4 ++--
.../apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala | 2 +-
tests/src/test/scala/docs/scaladsl/proto/Order.scala | 4 ++--
11 files changed, 41 insertions(+), 28 deletions(-)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
index 1b2fbc4c..9a55e5ff 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
@@ -26,13 +26,13 @@ private[benchmarks] class CsvFormatter(delimiter: Char,
quotingStyle: CsvQuotingStyle,
charset: Charset = StandardCharsets.UTF_8) {
- private[this] val charsetName = charset.name()
+ private val charsetName = charset.name()
- private[this] val delimiterBs = ByteString(String.valueOf(delimiter),
charsetName)
- private[this] val quoteBs = ByteString(String.valueOf(quoteChar),
charsetName)
- private[this] val duplicatedQuote =
ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
- private[this] val duplicatedEscape =
ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
- private[this] val endOfLineBs = ByteString(endOfLine, charsetName)
+ private val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
+ private val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
+ private val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar,
quoteChar)), charsetName)
+ private val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar,
escapeChar)), charsetName)
+ private val endOfLineBs = ByteString(endOfLine, charsetName)
def toCsv(fields: immutable.Iterable[Any]): ByteString =
if (fields.nonEmpty) nonEmptyToCsv(fields)
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
index b963fc10..c892d0b2 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala
@@ -43,8 +43,8 @@ import scala.concurrent.{ ExecutionContext, Future }
override protected def executionContext: ExecutionContext =
materializer.executionContext
protected def consumerFuture: Future[ActorRef]
- protected final var consumerActor: ActorRef = _
- protected var sourceActor: StageActor = _
+ protected final var consumerActor: ActorRef = null
+ protected var sourceActor: StageActor = null
protected var tps = Set.empty[TopicPartition]
private var requested = false
private var requestId = 0
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
index 6e44d72d..a47fb9ae 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala
@@ -61,7 +61,7 @@ private[kafka] trait DeferredProducer[K, V] {
import DeferredProducer._
/** The Kafka producer may be created lazily, assigned via `preStart` in
`assignProducer`. */
- protected var producer: Producer[K, V] = _
+ protected var producer: Producer[K, V] = null
protected var producerAssignmentLifecycle: ProducerAssignmentLifecycle =
Unassigned
protected def producerSettings: ProducerSettings[K, V]
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
index 976ee1a3..05123265 100644
---
a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
+++
b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala
@@ -196,23 +196,23 @@ import scala.util.control.NonFatal
private val pollMsg = Poll(this, periodic = true)
private val delayedPollMsg = Poll(this, periodic = false)
- private var settings: ConsumerSettings[K, V] = _
- private var pollTimeout: java.time.Duration = _
+ private var settings: ConsumerSettings[K, V] = null
+ private var pollTimeout: java.time.Duration = null
/** Limits the blocking on offsetForTimes */
- private var offsetForTimesTimeout: java.time.Duration = _
+ private var offsetForTimesTimeout: java.time.Duration = null
/** Limits the blocking on position in [[RebalanceListenerImpl]] */
- private var positionTimeout: java.time.Duration = _
+ private var positionTimeout: java.time.Duration = null
private var requests = Map.empty[ActorRef, RequestMessages]
/** ActorRefs of all stages that sent subscriptions requests or
`RegisterSubStage` to this actor (removed on their termination). */
private var stageActorsMap = Map.empty[Set[TopicPartition], ActorRef]
- private var consumer: Consumer[K, V] = _
+ private var consumer: Consumer[K, V] = null
private var commitsInProgress = 0
- private var commitRefreshing: CommitRefreshing = _
- private var resetProtection: ConsumerResetProtection = _
+ private var commitRefreshing: CommitRefreshing = null
+ private var resetProtection: ConsumerResetProtection = null
private var stopInProgress = false
/**
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
index 848f6259..0dac27f5 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
@@ -31,7 +31,7 @@ private[internal] trait InstanceId {
* Override Apache Pekko streams [[StageLogging]] to include an ID from
[[InstanceId]] as a prefix to each logging statement.
*/
private[internal] trait StageIdLogging extends StageLogging with InstanceId {
self: GraphStageLogic =>
- private[this] var _log: LoggingAdapter = _
+ private var _log: LoggingAdapter = null
protected def idLogPrefix: String = s"[$id] "
override def log: LoggingAdapter = {
if (_log eq null) {
@@ -45,7 +45,7 @@ private[internal] trait StageIdLogging extends StageLogging
with InstanceId { se
* Override Apache Pekko classic [[ActorLogging]] to include an ID from
[[InstanceId]] as a prefix to each logging statement.
*/
private[internal] trait ActorIdLogging extends ActorLogging with InstanceId {
this: Actor =>
- private[this] var _log: LoggingAdapter = _
+ private var _log: LoggingAdapter = null
protected def idLogPrefix: String = s"[$id] "
override def log: LoggingAdapter = {
if (_log eq null) {
diff --git
a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
index 211281e1..30e7487d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala
@@ -70,8 +70,8 @@ private class SubSourceLogic[K, V, Msg](
override def executionContext: ExecutionContext =
materializer.executionContext
override def consumerFuture: Future[ActorRef] = consumerPromise.future
- protected var consumerActor: ActorRef = _
- protected var sourceActor: StageActor = _
+ protected var consumerActor: ActorRef = null
+ protected var sourceActor: StageActor = null
/** Kafka has notified us that we have these partitions assigned, but we
have not created a source for them yet. */
private var pendingPartitions: immutable.Set[TopicPartition] =
immutable.Set.empty
@@ -413,7 +413,7 @@ private abstract class SubSourceStageLogic[K, V, Msg](
override def id: String = s"${super.id}#$actorNumber"
private val requestMessages = KafkaConsumerActor.Internal.RequestMessages(0,
Set(tp))
private var requested = false
- protected var subSourceActor: StageActor = _
+ protected var subSourceActor: StageActor = null
override def preStart(): Unit = {
super.preStart()
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index 37d8e38e..1e9aed2e 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -94,7 +94,20 @@ object ProjectSettings extends AutoPlugin {
else Seq.empty
},
scalacOptions ++= {
- if (scalaBinaryVersion.value == "3") Seq("-Yfuture-lazy-vals",
"-release:17")
+ if (scalaBinaryVersion.value == "3")
+ Seq(
+ "-Yfuture-lazy-vals",
+ "-release:17",
+ "-Wconf:msg=Implicit parameters should be provided with a `using`
clause:s",
+ "-Wconf:msg=is deprecated for wildcard arguments of types:s",
+ "-Wconf:msg=The trailing ` _` for eta-expansion is unnecessary:s",
+ "-Wconf:msg=with as a type operator has been deprecated:s",
+ "-Wconf:msg=Unreachable case except for null:s",
+ "-Wconf:msg=is no longer supported for vararg splices:s",
+ "-Wconf:msg=is not declared infix:s",
+ "-Wconf:msg=auto insertion will be deprecated:s",
+ "-Wconf:msg=Invalid message filter:s",
+ "-Wconf:msg=bad option.*-Yfuture-lazy-vals:s")
else Seq.empty
},
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
index 6c0bc5b7..d152d1e4 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/KafkaTestKit.scala
@@ -100,7 +100,7 @@ trait KafkaTestKit {
config
}
- private var adminClientVar: Admin = _
+ private var adminClientVar: Admin = null
/**
* Access to the Kafka Admin client
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
index a32e8b77..32dc338f 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/internal/TestcontainersKafka.scala
@@ -26,8 +26,8 @@ import scala.jdk.OptionConverters._
object TestcontainersKafka {
trait Spec extends KafkaSpec {
- private var cluster: KafkaContainerCluster = _
- private var kafkaBootstrapServersInternal: String = _
+ private var cluster: KafkaContainerCluster = null
+ private var kafkaBootstrapServersInternal: String = null
private var kafkaPortInternal: Int = -1
private def requireStarted(): Unit =
diff --git
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
index 65c3ea18..016dee93 100644
---
a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
+++
b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala
@@ -59,7 +59,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort:
Int, actorSystem: A
implicit val mat: Materializer = SystemMaterializer(system).materializer
implicit val scheduler: pekko.actor.Scheduler = system.scheduler
- var testProducer: KProducer[String, String] = _
+ var testProducer: KProducer[String, String] = null
def setUp(): Unit = {
testProducer = Await.result(producerDefaults.createKafkaProducerAsync(),
2.seconds)
diff --git a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
index 0c897461..c3d9495e 100644
--- a/tests/src/test/scala/docs/scaladsl/proto/Order.scala
+++ b/tests/src/test/scala/docs/scaladsl/proto/Order.scala
@@ -28,8 +28,8 @@ final case class Order(
extends scalapb.GeneratedMessage
with scalapb.lenses.Updatable[Order] {
@transient
- private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0
- private[this] def __computeSerializedValue(): _root_.scala.Int = {
+ private var __serializedSizeCachedValue: _root_.scala.Int = 0
+ private def __computeSerializedValue(): _root_.scala.Int = {
var __size = 0
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]