This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b068ea46 chore: Rewrite to scala3 syntax (#1675)
3b068ea46 is described below

commit 3b068ea46661783f9a443116ca07a063893934d4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Jun 14 20:43:29 2026 +0800

    chore: Rewrite to scala3 syntax (#1675)
    
    Motivation:
    Let Scala 3 Community build compile pekko-connectors.
    
    Modification:
    Update .scalafmt.conf dialect to scala213source3 and enable
    scala3 syntax rewrite rules, then reformat all sources.
    
    Result:
    Wildcard types use ? syntax (e.g. Class[?] instead of Class[_]).
    
    Tests:
    Not run - formatting only
    
    References:
    Refs apache/pekko#3048
---
 .scalafmt.conf                                     | 12 ++++++++++-
 .../docs/scaladsl/AbstractAvroParquet.scala        | 11 +++++-----
 .../connectors/awsspi/PekkoHttpClientSpec.scala    |  6 +++---
 .../connectors/cassandra/CqlSessionProvider.scala  |  2 +-
 .../cassandra/javadsl/CassandraSession.scala       | 10 ++++-----
 .../cassandra/javadsl/CassandraSource.scala        |  4 ++--
 .../cassandra/scaladsl/CassandraSession.scala      | 12 +++++------
 .../cassandra/scaladsl/CassandraSource.scala       |  4 ++--
 .../cassandra/scaladsl/CassandraLifecycle.scala    |  4 ++--
 .../couchbase/impl/CouchbaseSessionImpl.scala      | 16 +++++++--------
 .../impl/CouchbaseSessionJavaAdapter.scala         | 16 +++++++--------
 .../couchbase/javadsl/CouchbaseFlow.scala          | 10 ++++-----
 .../couchbase/javadsl/CouchbaseSession.scala       | 16 +++++++--------
 .../couchbase/javadsl/CouchbaseSink.scala          |  4 ++--
 .../pekko/stream/connectors/couchbase/model.scala  |  6 +++---
 .../couchbase/scaladsl/CouchbaseFlow.scala         | 10 ++++-----
 .../couchbase/scaladsl/CouchbaseSession.scala      | 16 +++++++--------
 .../couchbase/scaladsl/CouchbaseSink.scala         |  2 +-
 .../connectors/dynamodb/javadsl/DynamoDb.scala     |  2 +-
 .../connectors/dynamodb/scaladsl/DynamoDb.scala    |  4 ++--
 .../connectors/elasticsearch/ReadResult.scala      |  2 +-
 .../connectors/elasticsearch/WriteMessage.scala    |  4 ++--
 .../impl/ElasticsearchSimpleFlowStage.scala        |  2 +-
 .../impl/ElasticsearchSourceStage.scala            |  4 ++--
 .../elasticsearch/javadsl/ElasticsearchFlow.scala  | 16 +++++++--------
 .../elasticsearch/javadsl/ElasticsearchSink.scala  |  2 +-
 .../javadsl/ElasticsearchSource.scala              | 12 +++++------
 .../elasticsearch/scaladsl/ElasticsearchFlow.scala | 20 +++++++++---------
 .../elasticsearch/scaladsl/ElasticsearchSink.scala |  2 +-
 .../scaladsl/ElasticsearchSource.scala             | 12 +++++------
 .../docs/scaladsl/ElasticsearchSpecUtils.scala     |  2 +-
 .../file/impl/archive/TarArchiveManager.scala      |  2 +-
 .../stream/connectors/file/scaladsl/Archive.scala  |  2 +-
 .../pekko/stream/connectors/ftp/impl/FtpLike.scala |  4 ++--
 .../connectors/ftp/impl/SftpOperations.scala       |  4 ++--
 .../stream/connectors/geode/GeodeSettings.scala    |  6 +++---
 .../geode/impl/pdx/DelegatingPdxSerializer.scala   |  6 +++---
 .../geode/impl/pdx/ShapelessPdxSerializer.scala    |  2 +-
 .../scala/docs/scaladsl/PersonPdxSerializer.scala  |  2 +-
 .../bigquery/scaladsl/schema/ProductSchemas.scala  |  4 ++--
 .../pubsub/grpc/javadsl/GooglePubSub.scala         |  2 +-
 .../googlecloud/storage/impl/GCStorageStream.scala |  2 +-
 .../googlecloud/storage/javadsl/GCStorage.scala    |  2 +-
 .../googlecloud/storage/scaladsl/GCStorage.scala   |  2 +-
 .../storage/scaladsl/GCStorageWiremockBase.scala   |  2 +-
 .../stream/connectors/hdfs/scaladsl/HdfsFlow.scala |  2 +-
 .../impl/PekkoConnectorsResultMapperHelper.scala   | 24 +++++++++++-----------
 .../connectors/ironmq/impl/IronMqClient.scala      |  2 +-
 .../connectors/ironmq/impl/IronMqPushStage.scala   |  2 +-
 .../connectors/jakartams/JmsExceptions.scala       |  4 ++--
 .../jakartams/impl/JmsMessageProducer.scala        | 20 +++++++++---------
 .../jakartams/impl/JmsMessageReader.scala          |  2 +-
 .../jakartams/impl/JmsProducerStage.scala          |  6 +++---
 .../stream/connectors/jms/JmsExceptions.scala      |  4 ++--
 .../connectors/jms/impl/JmsMessageProducer.scala   | 20 +++++++++---------
 .../connectors/jms/impl/JmsMessageReader.scala     |  2 +-
 .../connectors/jms/impl/JmsProducerStage.scala     |  6 +++---
 .../test/scala/docs/scaladsl/JsonReaderTest.scala  |  2 +-
 .../kinesis/impl/KinesisSchedulerSourceStage.scala |  2 +-
 .../pekko/stream/connectors/mqtt/MqttPerf.scala    |  2 +-
 .../connectors/mqtt/streaming/MqttPerf.scala       |  4 ++--
 .../mqtt/streaming/impl/ClientState.scala          |  6 +++---
 .../mqtt/streaming/impl/RequestState.scala         | 12 +++++------
 .../test/scala/docs/scaladsl/MqttSessionSpec.scala | 20 +++++++++---------
 project/CopyrightHeader.scala                      |  6 +++---
 project/CopyrightHeaderForBuild.scala              |  2 +-
 .../stream/connectors/s3/impl/HttpRequests.scala   |  2 +-
 .../pekko/stream/connectors/s3/impl/S3Stream.scala |  8 ++++----
 .../pekko/stream/connectors/s3/javadsl/S3.scala    | 18 ++++++++--------
 .../pekko/stream/connectors/s3/scaladsl/S3.scala   | 10 ++++-----
 .../connectors/s3/scaladsl/S3IntegrationSpec.scala |  2 +-
 .../connectors/s3/scaladsl/S3WireMockBase.scala    |  2 +-
 .../docs/scaladsl/SlickWithTryResultSpec.scala     |  6 +++---
 .../text/scaladsl/CharsetCodingFlowsSpec.scala     |  6 +++---
 .../impl/UnixDomainSocketImpl.scala                |  2 +-
 .../javadsl/UnixDomainSocket.scala                 |  2 +-
 .../scaladsl/UnixDomainSocket.scala                |  4 ++--
 .../scala/docs/scaladsl/UnixDomainSocketSpec.scala |  2 +-
 78 files changed, 256 insertions(+), 245 deletions(-)

diff --git a/.scalafmt.conf b/.scalafmt.conf
index 2f63f8c75..e3bb547ae 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,5 +1,5 @@
 version                                  = 3.11.1
-runner.dialect                           = scala213
+runner.dialect                           = scala213source3
 project.git                              = true
 style                                    = defaultWithAlign
 docstrings.style                         = Asterisk
@@ -75,3 +75,13 @@ project.excludeFilters = [
   "scripts/authors.scala"
 ]
 project.layout         = StandardConvention
+
+rewrite.scala3.convertToNewSyntax = true
+runner {
+  dialectOverride {
+    allowSignificantIndentation = false
+    allowAsForImportRename = false
+    allowStarWildcardImport = false
+    allowPostfixStarVarargSplices = false
+  }
+}
diff --git 
a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala 
b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
index f3358adb1..49093f18a 100644
--- a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
+++ b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
@@ -13,7 +13,7 @@
 
 package docs.scaladsl
 
-import com.sksamuel.avro4s._
+import com.sksamuel.avro4s.*
 import org.apache.pekko.testkit.TestKit
 import org.scalatest.{ BeforeAndAfterAll, Suite }
 
@@ -32,10 +32,11 @@ trait AbstractAvroParquet extends BeforeAndAfterAll with 
AbstractAvroParquetBase
   }
 
   private def deleteRecursively(f: File): Boolean = {
-    if (f.isDirectory) f.listFiles match {
-      case null =>
-      case xs   => xs.foreach(deleteRecursively)
-    }
+    if f.isDirectory then
+      f.listFiles match {
+        case null =>
+        case xs   => xs.foreach(deleteRecursively)
+      }
     f.delete()
   }
 }
diff --git 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
index b9c35672e..1811285f4 100644
--- 
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
+++ 
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -71,7 +71,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers 
with OptionValues {
     "use sdk content length from headers when publisher returns empty 
contentLength" in {
       val publisher = new SdkHttpContentPublisher {
         override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.empty()
-        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+        override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
       }
       val entity =
         PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher,
@@ -82,7 +82,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers 
with OptionValues {
     "use publisher contentLength when sdkContentLength is absent" in {
       val publisher = new SdkHttpContentPublisher {
         override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.of(99L)
-        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+        override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
       }
       val entity =
         PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher, None)
@@ -92,7 +92,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers 
with OptionValues {
     "prefer sdk content length over publisher contentLength when both are 
present" in {
       val publisher = new SdkHttpContentPublisher {
         override def contentLength(): java.util.Optional[java.lang.Long] = 
java.util.Optional.of(55L)
-        override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+        override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
       }
       val entity =
         PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT, 
ContentTypes.NoContentType, publisher,
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
index 535756340..e4dabdacc 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
@@ -77,7 +77,7 @@ object CqlSessionProvider {
     val className = config.getString("session-provider")
     val dynamicAccess = system.asInstanceOf[ExtendedActorSystem].dynamicAccess
     val clazz = dynamicAccess.getClassFor[CqlSessionProvider](className).get
-    def instantiate(args: immutable.Seq[(Class[_], AnyRef)]) =
+    def instantiate(args: immutable.Seq[(Class[?], AnyRef)]) =
       dynamicAccess.createInstanceFor[CqlSessionProvider](clazz, args)
 
     val params = List((classOf[ActorSystem], system), (classOf[Config], 
config))
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
index dde41c43a..50665acf4 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
@@ -151,7 +151,7 @@ final class CassandraSession(@InternalApi private[pekko] 
val delegate: scaladsl.
    * The returned `CompletionStage` is completed when the statement has been
    * successfully executed, or if it fails.
    */
-  def executeWrite(stmt: Statement[_]): CompletionStage[Done] =
+  def executeWrite(stmt: Statement[?]): CompletionStage[Done] =
     delegate.executeWrite(stmt).asJava
 
   /**
@@ -180,7 +180,7 @@ final class CassandraSession(@InternalApi private[pekko] 
val delegate: scaladsl.
    * Note that you have to connect a `Sink` that consumes the messages from
    * this `Source` and then `run` the stream.
    */
-  def select(stmt: Statement[_]): Source[Row, NotUsed] =
+  def select(stmt: Statement[?]): Source[Row, NotUsed] =
     delegate.select(stmt).asJava
 
   /**
@@ -195,7 +195,7 @@ final class CassandraSession(@InternalApi private[pekko] 
val delegate: scaladsl.
    * Note that you have to connect a `Sink` that consumes the messages from
    * this `Source` and then `run` the stream.
    */
-  def select(stmt: CompletionStage[Statement[_]]): Source[Row, NotUsed] =
+  def select(stmt: CompletionStage[Statement[?]]): Source[Row, NotUsed] =
     delegate.select(stmt.asScala).asJava
 
   /**
@@ -223,7 +223,7 @@ final class CassandraSession(@InternalApi private[pekko] 
val delegate: scaladsl.
    *
    * The returned `CompletionStage` is completed with the found rows.
    */
-  def selectAll(stmt: Statement[_]): CompletionStage[JList[Row]] =
+  def selectAll(stmt: Statement[?]): CompletionStage[JList[Row]] =
     delegate.selectAll(stmt).map(_.asJava).asJava
 
   /**
@@ -249,7 +249,7 @@ final class CassandraSession(@InternalApi private[pekko] 
val delegate: scaladsl.
    * The returned `CompletionStage` is completed with the first row,
    * if any.
    */
-  def selectOne(stmt: Statement[_]): CompletionStage[Optional[Row]] =
+  def selectOne(stmt: Statement[?]): CompletionStage[Optional[Row]] =
     delegate.selectOne(stmt).map(_.toJava).asJava
 
   /**
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
index 00a90b339..8d8a2577c 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
@@ -41,7 +41,7 @@ object CassandraSource {
    *
    * See <a 
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html";>Querying
 data</a>.
    */
-  def create(session: CassandraSession, stmt: Statement[_]): Source[Row, 
NotUsed] =
+  def create(session: CassandraSession, stmt: Statement[?]): Source[Row, 
NotUsed] =
     session.select(stmt)
 
   /**
@@ -49,7 +49,7 @@ object CassandraSource {
    *
    * See <a 
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html";>Querying
 data</a>.
    */
-  def fromCompletionStage(session: CassandraSession, stmt: 
CompletionStage[Statement[_]]): Source[Row, NotUsed] =
+  def fromCompletionStage(session: CassandraSession, stmt: 
CompletionStage[Statement[?]]): Source[Row, NotUsed] =
     session.select(stmt)
 
 }
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
index e5aff3813..c2bba9034 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
@@ -171,7 +171,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
    * The returned `Future` is completed when the statement has been
    * successfully executed, or if it fails.
    */
-  def executeWrite(stmt: Statement[_]): Future[Done] = {
+  def executeWrite(stmt: Statement[?]): Future[Done] = {
     underlying().flatMap { cqlSession =>
       cqlSession.executeAsync(stmt).asScala.map(_ => Done)
     }
@@ -194,7 +194,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
   /**
    * INTERNAL API
    */
-  @InternalApi private[pekko] def selectResultSet(stmt: Statement[_]): 
Future[AsyncResultSet] = {
+  @InternalApi private[pekko] def selectResultSet(stmt: Statement[?]): 
Future[AsyncResultSet] = {
     underlying().flatMap { s =>
       s.executeAsync(stmt).asScala
     }
@@ -212,7 +212,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
    * Note that you have to connect a `Sink` that consumes the messages from
    * this `Source` and then `run` the stream.
    */
-  def select(stmt: Statement[_]): Source[Row, NotUsed] = {
+  def select(stmt: Statement[?]): Source[Row, NotUsed] = {
     Source
       .futureSource {
         underlying().map { cqlSession =>
@@ -233,7 +233,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
    * Note that you have to connect a `Sink` that consumes the messages from
    * this `Source` and then `run` the stream.
    */
-  def select(stmt: Future[Statement[_]]): Source[Row, NotUsed] = {
+  def select(stmt: Future[Statement[?]]): Source[Row, NotUsed] = {
     Source
       .futureSource {
         underlying().flatMap(cqlSession => stmt.map(cqlSession -> _)).map {
@@ -269,7 +269,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
    *
    * The returned `Future` is completed with the found rows.
    */
-  def selectAll(stmt: Statement[_]): Future[immutable.Seq[Row]] = {
+  def selectAll(stmt: Statement[?]): Future[immutable.Seq[Row]] = {
     select(stmt)
       .runWith(Sink.seq)
   }
@@ -297,7 +297,7 @@ final class CassandraSession(system: 
pekko.actor.ActorSystem,
    * The returned `Future` is completed with the first row,
    * if any.
    */
-  def selectOne(stmt: Statement[_]): Future[Option[Row]] = {
+  def selectOne(stmt: Statement[?]): Future[Option[Row]] = {
     selectResultSet(stmt).map { rs =>
       Option(rs.one()) // rs.one returns null if exhausted
     }
diff --git 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
index 107ff77ed..3eff63cdc 100644
--- 
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
+++ 
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
@@ -38,7 +38,7 @@ object CassandraSource {
    *
    * See <a 
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html";>Querying
 data</a>.
    */
-  def apply(stmt: Statement[_])(implicit session: CassandraSession): 
Source[Row, NotUsed] =
+  def apply(stmt: Statement[?])(implicit session: CassandraSession): 
Source[Row, NotUsed] =
     session.select(stmt)
 
   /**
@@ -46,7 +46,7 @@ object CassandraSource {
    *
    * See <a 
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html";>Querying
 data</a>.
    */
-  def fromFuture(stmt: Future[Statement[_]])(implicit session: 
CassandraSession): Source[Row, NotUsed] =
+  def fromFuture(stmt: Future[Statement[?]])(implicit session: 
CassandraSession): Source[Row, NotUsed] =
     session.select(stmt)
 
 }
diff --git 
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
 
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
index fb4e7e40b..7f05f8cec 100644
--- 
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
+++ 
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
@@ -32,7 +32,7 @@ import scala.util.control.NonFatal
 trait CassandraLifecycleBase {
   def lifecycleSession: CassandraSession
 
-  def execute(session: CassandraSession, statements: 
immutable.Seq[BatchableStatement[_]]): Future[Done] = {
+  def execute(session: CassandraSession, statements: 
immutable.Seq[BatchableStatement[?]]): Future[Done] = {
     val batch = new BatchStatementBuilder(BatchType.LOGGED)
     statements.foreach { stmt =>
       batch.addStatement(stmt)
@@ -62,7 +62,7 @@ trait CassandraLifecycleBase {
 
   def dropKeyspace(name: String): Future[Done] = 
withSchemaMetadataDisabled(dropKeyspace(lifecycleSession, name))
 
-  def execute(statements: immutable.Seq[BatchableStatement[_]]): Future[Done] 
= execute(lifecycleSession, statements)
+  def execute(statements: immutable.Seq[BatchableStatement[?]]): Future[Done] 
= execute(lifecycleSession, statements)
 
   def executeCql(statements: immutable.Seq[String]): Future[Done] = 
executeCql(lifecycleSession, statements)
 
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
index 5b6481e04..5d00543d4 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
@@ -48,13 +48,13 @@ final private[couchbase] class 
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
 
   def insert(document: JsonDocument): Future[JsonDocument] = 
insertDoc(document)
 
-  def insertDoc[T <: Document[_]](document: T): Future[T] =
+  def insertDoc[T <: Document[?]](document: T): Future[T] =
     singleObservableToFuture(asyncBucket.insert(document), document)
 
   def insert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): 
Future[JsonDocument] =
     insertDoc(document, writeSettings)
 
-  def insertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
+  def insertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
     singleObservableToFuture(asyncBucket.insert(document,
       writeSettings.persistTo,
       writeSettings.replicateTo,
@@ -65,26 +65,26 @@ final private[couchbase] class 
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
   def get(id: String): Future[Option[JsonDocument]] =
     zeroOrOneObservableToFuture(asyncBucket.get(id))
 
-  def get[T <: Document[_]](id: String, documentClass: Class[T]): 
Future[Option[T]] =
+  def get[T <: Document[?]](id: String, documentClass: Class[T]): 
Future[Option[T]] =
     zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass))
 
   def get(id: String, timeout: FiniteDuration): Future[Option[JsonDocument]] =
     zeroOrOneObservableToFuture(asyncBucket.get(id, timeout.toMillis, 
TimeUnit.MILLISECONDS))
 
-  def get[T <: Document[_]](id: String,
+  def get[T <: Document[?]](id: String,
       timeout: FiniteDuration,
       documentClass: Class[T]): scala.concurrent.Future[Option[T]] =
     zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass, 
timeout.toMillis, TimeUnit.MILLISECONDS))
 
   def upsert(document: JsonDocument): Future[JsonDocument] = 
upsertDoc(document)
 
-  def upsertDoc[T <: Document[_]](document: T): Future[T] =
+  def upsertDoc[T <: Document[?]](document: T): Future[T] =
     singleObservableToFuture(asyncBucket.upsert(document), document.id)
 
   def upsert(document: JsonDocument, writeSettings: CouchbaseWriteSettings): 
Future[JsonDocument] =
     upsertDoc(document, writeSettings)
 
-  def upsertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
+  def upsertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
     singleObservableToFuture(asyncBucket.upsert(document,
       writeSettings.persistTo,
       writeSettings.replicateTo,
@@ -94,13 +94,13 @@ final private[couchbase] class 
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
 
   def replace(document: JsonDocument): Future[JsonDocument] = 
replaceDoc(document)
 
-  def replaceDoc[T <: Document[_]](document: T): Future[T] =
+  def replaceDoc[T <: Document[?]](document: T): Future[T] =
     singleObservableToFuture(asyncBucket.replace(document), document.id)
 
   def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings): 
Future[JsonDocument] =
     replaceDoc(document, writeSettings)
 
-  def replaceDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
+  def replaceDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T] =
     singleObservableToFuture(asyncBucket.replace(document,
       writeSettings.persistTo,
       writeSettings.replicateTo,
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
index 52d0b19e6..77b62ce62 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
@@ -49,47 +49,47 @@ private[couchbase] final class 
CouchbaseSessionJavaAdapter(delegate: scaladsl.Co
 
   override def insert(document: JsonDocument): CompletionStage[JsonDocument] = 
delegate.insertDoc(document).asJava
 
-  override def insertDoc[T <: Document[_]](document: T): CompletionStage[T] = 
delegate.insertDoc(document).asJava
+  override def insertDoc[T <: Document[?]](document: T): CompletionStage[T] = 
delegate.insertDoc(document).asJava
 
   override def insert(
       document: JsonDocument,
       writeSettings: CouchbaseWriteSettings): CompletionStage[JsonDocument] =
     delegate.insert(document, writeSettings).asJava
 
-  override def insertDoc[T <: Document[_]](
+  override def insertDoc[T <: Document[?]](
       document: T,
       writeSettings: CouchbaseWriteSettings): CompletionStage[T] = 
delegate.insertDoc(document, writeSettings).asJava
 
   override def get(id: String): CompletionStage[Optional[JsonDocument]] =
     futureOptToJava(delegate.get(id))
 
-  override def get[T <: Document[_]](id: String, clazz: Class[T]): 
CompletionStage[Optional[T]] =
+  override def get[T <: Document[?]](id: String, clazz: Class[T]): 
CompletionStage[Optional[T]] =
     futureOptToJava(delegate.get(id, clazz))
 
   override def get(id: String, timeout: Duration): 
CompletionStage[Optional[JsonDocument]] =
     futureOptToJava(delegate.get(id, FiniteDuration.apply(timeout.toNanos, 
duration.NANOSECONDS)))
 
-  def get[T <: Document[_]](id: String, timeout: Duration, documentClass: 
Class[T]): CompletionStage[Optional[T]] =
+  def get[T <: Document[?]](id: String, timeout: Duration, documentClass: 
Class[T]): CompletionStage[Optional[T]] =
     futureOptToJava(delegate.get(id, FiniteDuration.apply(timeout.toNanos, 
duration.NANOSECONDS), documentClass))
 
   override def upsert(document: JsonDocument): CompletionStage[JsonDocument] = 
delegate.upsert(document).asJava
 
-  override def upsertDoc[T <: Document[_]](document: T): CompletionStage[T] = 
delegate.upsertDoc(document).asJava
+  override def upsertDoc[T <: Document[?]](document: T): CompletionStage[T] = 
delegate.upsertDoc(document).asJava
 
   override def upsert(document: JsonDocument, writeSettings: 
CouchbaseWriteSettings): CompletionStage[JsonDocument] =
     delegate.upsert(document, writeSettings).asJava
 
-  override def upsertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T] =
+  override def upsertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T] =
     delegate.upsertDoc(document, writeSettings).asJava
 
   override def replace(document: JsonDocument): CompletionStage[JsonDocument] 
= delegate.replace(document).asJava
 
-  override def replaceDoc[T <: Document[_]](document: T): CompletionStage[T] = 
delegate.replaceDoc(document).asJava
+  override def replaceDoc[T <: Document[?]](document: T): CompletionStage[T] = 
delegate.replaceDoc(document).asJava
 
   override def replace(document: JsonDocument, writeSettings: 
CouchbaseWriteSettings): CompletionStage[JsonDocument] =
     delegate.replace(document, writeSettings).asJava
 
-  override def replaceDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T] =
+  override def replaceDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T] =
     delegate.replaceDoc(document, writeSettings).asJava
 
   override def remove(id: String): CompletionStage[Done] = 
delegate.remove(id).asJava
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
index aa267cdab..f5ffc1a6b 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
@@ -39,7 +39,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to query Couchbase for by `id` and emit documents of the 
given class.
    */
-  def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def fromId[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       bucketName: String,
       target: Class[T]): Flow[String, T, NotUsed] =
     scaladsl.CouchbaseFlow.fromId(sessionSettings, bucketName, target).asJava
@@ -55,7 +55,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to update or insert a Couchbase document of the given class.
    */
-  def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, T, NotUsed] =
     scaladsl.CouchbaseFlow.upsertDoc(sessionSettings, writeSettings, 
bucketName).asJava
@@ -64,7 +64,7 @@ object CouchbaseFlow {
    * Create a flow to update or insert a Couchbase document of the given class 
and emit a result so that write failures
    * can be handled in-stream.
    */
-  def upsertDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
+  def upsertDocWithResult[T <: Document[?]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
     scaladsl.CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings, 
bucketName).asJava
@@ -80,7 +80,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to replace a Couchbase document of the given class.
    */
-  def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, T, NotUsed] =
     scaladsl.CouchbaseFlow.replaceDoc(sessionSettings, writeSettings, 
bucketName).asJava
@@ -89,7 +89,7 @@ object CouchbaseFlow {
    * Create a flow to replace a Couchbase document of the given class and emit 
a result so that write failures
    * can be handled in-stream.
    */
-  def replaceDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
+  def replaceDocWithResult[T <: Document[?]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
     scaladsl.CouchbaseFlow.replaceDocWithResult(sessionSettings, 
writeSettings, bucketName).asJava
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
index a2ec5d80c..b07a23884 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
@@ -113,7 +113,7 @@ abstract class CouchbaseSession {
    *
    * @return A CompletionStage that completes with the written document when 
the write completes
    */
-  def insertDoc[T <: Document[_]](document: T): CompletionStage[T]
+  def insertDoc[T <: Document[?]](document: T): CompletionStage[T]
 
   /**
    * Insert a JSON document using the given write settings.
@@ -126,7 +126,7 @@ abstract class CouchbaseSession {
    * Insert a document using the given write settings.
    * Separate from `insert` to make the most common case smoother with the 
type inference
    */
-  def insertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
+  def insertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
 
   /**
    * @return A document if found or none if there is no document for the id
@@ -136,7 +136,7 @@ abstract class CouchbaseSession {
   /**
    * @return A document if found or none if there is no document for the id
    */
-  def get[T <: Document[_]](id: String, documentClass: Class[T]): 
CompletionStage[Optional[T]]
+  def get[T <: Document[?]](id: String, documentClass: Class[T]): 
CompletionStage[Optional[T]]
 
   /**
    * @param timeout fail the returned CompletionStage with a TimeoutException 
if it takes longer than this
@@ -148,7 +148,7 @@ abstract class CouchbaseSession {
    * @param timeout fail the returned CompletionStage with a TimeoutException 
if it takes longer than this
    * @return A document if found or none if there is no document for the id
    */
-  def get[T <: Document[_]](id: String, timeout: Duration, documentClass: 
Class[T]): CompletionStage[Optional[T]]
+  def get[T <: Document[?]](id: String, timeout: Duration, documentClass: 
Class[T]): CompletionStage[Optional[T]]
 
   /**
    * Upsert using the default write settings
@@ -165,7 +165,7 @@ abstract class CouchbaseSession {
    *
    * @return a CompletionStage that completes when the upsert is done
    */
-  def upsertDoc[T <: Document[_]](document: T): CompletionStage[T]
+  def upsertDoc[T <: Document[?]](document: T): CompletionStage[T]
 
   /**
    * Upsert using the given write settings.
@@ -183,7 +183,7 @@ abstract class CouchbaseSession {
    *
    * @return a CompletionStage that completes when the upsert is done
    */
-  def upsertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
+  def upsertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
 
   /**
    * Replace using the default write settings
@@ -200,7 +200,7 @@ abstract class CouchbaseSession {
    *
    * @return a CompletionStage that completes when the replace is done
    */
-  def replaceDoc[T <: Document[_]](document: T): CompletionStage[T]
+  def replaceDoc[T <: Document[?]](document: T): CompletionStage[T]
 
   /**
    * Replace using the given write settings.
@@ -218,7 +218,7 @@ abstract class CouchbaseSession {
    *
    * @return a CompletionStage that completes when the replace is done
    */
-  def replaceDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
+  def replaceDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): CompletionStage[T]
 
   /**
    * Remove a document by id using the default write settings.
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
index 7b4d385d0..3eabab6df 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
@@ -39,7 +39,7 @@ object CouchbaseSink {
   /**
    * Create a sink to update or insert a Couchbase document of the given class.
    */
-  def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Sink[T, CompletionStage[Done]] =
     CouchbaseFlow
@@ -59,7 +59,7 @@ object CouchbaseSink {
   /**
    * Create a sink to replace a Couchbase document of the given class.
    */
-  def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Sink[T, CompletionStage[Done]] =
     CouchbaseFlow
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
index cc0c2de48..af17b70f8 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
@@ -261,7 +261,7 @@ final class CouchbaseSessionSettings private (
 /**
  * Wrapper to for handling Couchbase write failures in-stream instead of 
failing the stream.
  */
-sealed trait CouchbaseWriteResult[T <: Document[_]] {
+sealed trait CouchbaseWriteResult[T <: Document[?]] {
   def isSuccess: Boolean
   def isFailure: Boolean
   def doc: T
@@ -270,7 +270,7 @@ sealed trait CouchbaseWriteResult[T <: Document[_]] {
 /**
  * Emitted for a successful Couchbase write operation.
  */
-final case class CouchbaseWriteSuccess[T <: Document[_]] private[couchbase] (
+final case class CouchbaseWriteSuccess[T <: Document[?]] private[couchbase] (
     override val doc: T) extends CouchbaseWriteResult[T] {
   val isSuccess: Boolean = true
   val isFailure: Boolean = false
@@ -279,7 +279,7 @@ final case class CouchbaseWriteSuccess[T <: Document[_]] 
private[couchbase] (
 /**
  * Emitted for a failed Couchbase write operation.
  */
-final case class CouchbaseWriteFailure[T <: Document[_]] private[couchbase] 
(override val doc: T, failure: Throwable)
+final case class CouchbaseWriteFailure[T <: Document[?]] private[couchbase] 
(override val doc: T, failure: Throwable)
     extends CouchbaseWriteResult[T] {
   val isSuccess: Boolean = false
   val isFailure: Boolean = true
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
index ad35b1cee..86deede60 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
@@ -52,7 +52,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to query Couchbase for by `id` and emit documents of the 
given class.
    */
-  def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def fromId[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       bucketName: String,
       target: Class[T]): Flow[String, T, NotUsed] =
     Flow
@@ -82,7 +82,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to update or insert a Couchbase document of the given class.
    */
-  def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, T, NotUsed] =
     Flow
@@ -98,7 +98,7 @@ object CouchbaseFlow {
    * Create a flow to update or insert a Couchbase document of the given class 
and emit a result so that write failures
    * can be handled in-stream.
    */
-  def upsertDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
+  def upsertDocWithResult[T <: Document[?]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
     val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
@@ -136,7 +136,7 @@ object CouchbaseFlow {
   /**
    * Create a flow to replace a Couchbase document of the given class.
    */
-  def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, T, NotUsed] =
     Flow
@@ -152,7 +152,7 @@ object CouchbaseFlow {
    * Create a flow to replace a Couchbase document of the given class and emit 
a result so that write failures
    * can be handled in-stream.
    */
-  def replaceDocWithResult[T <: Document[_]](sessionSettings: 
CouchbaseSessionSettings,
+  def replaceDocWithResult[T <: Document[?]](sessionSettings: 
CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
     val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
index 8861a298f..25764beef 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
@@ -114,7 +114,7 @@ trait CouchbaseSession {
    *
    * @return A future that completes with the written document when the write 
completes
    */
-  def insertDoc[T <: Document[_]](document: T): Future[T]
+  def insertDoc[T <: Document[?]](document: T): Future[T]
 
   /**
    * Insert a JSON document using the given write settings.
@@ -129,7 +129,7 @@ trait CouchbaseSession {
    *
    * @return A future that completes with the written document when the write 
completes
    */
-  def insertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
+  def insertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
 
   /**
    * @return A document if found or none if there is no document for the id
@@ -139,7 +139,7 @@ trait CouchbaseSession {
   /**
    * @return A document of the given type if found or none if there is no 
document for the id
    */
-  def get[T <: Document[_]](id: String, documentClass: Class[T]): 
Future[Option[T]]
+  def get[T <: Document[?]](id: String, documentClass: Class[T]): 
Future[Option[T]]
 
   /**
    * @param timeout fail the returned future with a TimeoutException if it 
takes longer than this
@@ -150,7 +150,7 @@ trait CouchbaseSession {
   /**
    * @return A document of the given type if found or none if there is no 
document for the id
    */
-  def get[T <: Document[_]](id: String, timeout: FiniteDuration, 
documentClass: Class[T]): Future[Option[T]]
+  def get[T <: Document[?]](id: String, timeout: FiniteDuration, 
documentClass: Class[T]): Future[Option[T]]
 
   /**
    * Upsert using the default write settings.
@@ -168,7 +168,7 @@ trait CouchbaseSession {
    *
    * @return a future that completes when the upsert is done
    */
-  def upsertDoc[T <: Document[_]](document: T): Future[T]
+  def upsertDoc[T <: Document[?]](document: T): Future[T]
 
   /**
    * Upsert using the given write settings
@@ -186,7 +186,7 @@ trait CouchbaseSession {
    *
    * @return a future that completes when the upsert is done
    */
-  def upsertDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
+  def upsertDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
 
   /**
    * Replace using the default write settings.
@@ -204,7 +204,7 @@ trait CouchbaseSession {
    *
    * @return a future that completes when the replace is done
    */
-  def replaceDoc[T <: Document[_]](document: T): Future[T]
+  def replaceDoc[T <: Document[?]](document: T): Future[T]
 
   /**
    * Replace using the given write settings
@@ -222,7 +222,7 @@ trait CouchbaseSession {
    *
    * @return a future that completes when the replace is done
    */
-  def replaceDoc[T <: Document[_]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
+  def replaceDoc[T <: Document[?]](document: T, writeSettings: 
CouchbaseWriteSettings): Future[T]
 
   /**
    * Remove a document by id using the default write settings.
diff --git 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
index fbbcb5722..0cc33e1c0 100644
--- 
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
+++ 
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
@@ -37,7 +37,7 @@ object CouchbaseSink {
   /**
    * Create a sink to update or insert a Couchbase document of the given class.
    */
-  def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+  def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
       writeSettings: CouchbaseWriteSettings,
       bucketName: String): Sink[T, Future[Done]] =
     CouchbaseFlow
diff --git 
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
 
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
index 3c0f65008..7ab4dfba7 100644
--- 
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
+++ 
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
@@ -75,7 +75,7 @@ object DynamoDb {
    */
   def flowPaginated[In <: DynamoDbRequest, Out <: DynamoDbResponse](
       client: DynamoDbAsyncClient,
-      operation: DynamoDbPaginatedOp[In, Out, _]): Flow[In, Out, NotUsed] =
+      operation: DynamoDbPaginatedOp[In, Out, ?]): Flow[In, Out, NotUsed] =
     scaladsl.DynamoDb.flowPaginated()(client, operation).asJava
 
   /**
diff --git 
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
 
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
index c90943018..73004dd3d 100644
--- 
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
+++ 
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
@@ -68,7 +68,7 @@ object DynamoDb {
    */
   def source[In <: DynamoDbRequest, Out <: DynamoDbResponse](
       request: In)(
-      implicit client: DynamoDbAsyncClient, operation: DynamoDbPaginatedOp[In, 
Out, _]): Source[Out, NotUsed] =
+      implicit client: DynamoDbAsyncClient, operation: DynamoDbPaginatedOp[In, 
Out, ?]): Source[Out, NotUsed] =
     Source.fromPublisher(operation.publisher(request))
 
   /**
@@ -78,7 +78,7 @@ object DynamoDb {
    */
   def flowPaginated[In <: DynamoDbRequest, Out <: DynamoDbResponse]()(
       implicit client: DynamoDbAsyncClient,
-      operation: DynamoDbPaginatedOp[In, Out, _]): Flow[In, Out, NotUsed] = 
Flow[In].flatMapConcat(source(_))
+      operation: DynamoDbPaginatedOp[In, Out, ?]): Flow[In, Out, NotUsed] = 
Flow[In].flatMapConcat(source(_))
 
   /**
    * Create a Future that will be completed with a response to a given request.
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
index ab61474b6..5be0196ed 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
@@ -35,7 +35,7 @@ final class ReadResult[T] @InternalApi private[elasticsearch] 
(val id: String,
     s"""ReadResult(id=$id,source=$source,version=${version.getOrElse("")})"""
 
   override def equals(other: Any): Boolean = other match {
-    case that: ReadResult[_] =>
+    case that: ReadResult[?] =>
       java.util.Objects.equals(this.id, that.id) &&
       java.util.Objects.equals(this.source, that.source) &&
       java.util.Objects.equals(this.version, that.version)
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
index da25a4dca..dc68a0d9e 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
@@ -95,7 +95,7 @@ final class WriteMessage[T, PT] private (val operation: 
Operation,
     
s"""WriteMessage(operation=$operation,id=$id,source=$source,passThrough=$passThrough,version=$version,indexName=$indexName,customMetadata=$customMetadata)"""
 
   override def equals(other: Any): Boolean = other match {
-    case that: WriteMessage[_, _] =>
+    case that: WriteMessage[?, ?] =>
       java.util.Objects.equals(this.operation, that.operation) &&
       java.util.Objects.equals(this.id, that.id) &&
       java.util.Objects.equals(this.source, that.source) &&
@@ -168,7 +168,7 @@ final class WriteResult[T2, C2] @InternalApi 
private[elasticsearch] (val message
     s"""WriteResult(message=$message,error=$error)"""
 
   override def equals(other: Any): Boolean = other match {
-    case that: WriteResult[_, _] =>
+    case that: WriteResult[?, ?] =>
       java.util.Objects.equals(this.message, that.message) &&
       java.util.Objects.equals(this.error, that.error)
     case _ => false
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
index 18e1186e1..4a7e502b5 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
@@ -35,7 +35,7 @@ import scala.concurrent.{ ExecutionContext, Future }
 @InternalApi
 private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C](
     elasticsearchParams: ElasticsearchParams,
-    settings: WriteSettingsBase[_, _],
+    settings: WriteSettingsBase[?, ?],
     writer: MessageWriter[T])(implicit http: HttpExt, mat: Materializer, ec: 
ExecutionContext)
     extends GraphStage[
       FlowShape[(immutable.Seq[WriteMessage[T, C]], 
immutable.Seq[WriteResult[T, C]]),
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
index 4a4222c2d..75056d867 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
@@ -61,7 +61,7 @@ private[elasticsearch] trait MessageReader[T] {
 private[elasticsearch] final class ElasticsearchSourceStage[T](
     elasticsearchParams: ElasticsearchParams,
     searchParams: Map[String, String],
-    settings: SourceSettingsBase[_, _],
+    settings: SourceSettingsBase[?, ?],
     reader: MessageReader[T])(implicit http: HttpExt, mat: Materializer, ec: 
ExecutionContext)
     extends GraphStage[SourceShape[ReadResult[T]]] {
 
@@ -86,7 +86,7 @@ object ElasticsearchSourceStage {
 private[elasticsearch] final class ElasticsearchSourceLogic[T](
     elasticsearchParams: ElasticsearchParams,
     searchParams: Map[String, String],
-    settings: SourceSettingsBase[_, _],
+    settings: SourceSettingsBase[?, ?],
     out: Outlet[ReadResult[T]],
     shape: SourceShape[ReadResult[T]],
     reader: MessageReader[T])(implicit http: HttpExt, mat: Materializer, ec: 
ExecutionContext)
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
index bb7d488db..35aa6656b 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
@@ -37,7 +37,7 @@ object ElasticsearchFlow {
    */
   def create[T](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       objectMapper: ObjectMapper)
       : pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, 
NotUsed], NotUsed] =
     create(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
@@ -53,7 +53,7 @@ object ElasticsearchFlow {
    */
   def create[T](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       messageWriter: MessageWriter[T])
       : pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, 
NotUsed], NotUsed] =
     scaladsl.ElasticsearchFlow
@@ -72,7 +72,7 @@ object ElasticsearchFlow {
    */
   def createWithPassThrough[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       objectMapper: ObjectMapper): pekko.stream.javadsl.Flow[WriteMessage[T, 
C], WriteResult[T, C], NotUsed] =
     createWithPassThrough(elasticsearchParams, settings, new 
JacksonWriter[T](objectMapper))
 
@@ -88,7 +88,7 @@ object ElasticsearchFlow {
    */
   def createWithPassThrough[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       messageWriter: MessageWriter[T]): 
pekko.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
     scaladsl.ElasticsearchFlow
       .createWithPassThrough(elasticsearchParams, settings, messageWriter)
@@ -107,7 +107,7 @@ object ElasticsearchFlow {
    */
   def createBulk[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       objectMapper: ObjectMapper): 
pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
     java.util.List[WriteResult[T, C]], NotUsed] =
     createBulk(elasticsearchParams, settings, new 
JacksonWriter[T](objectMapper))
@@ -125,7 +125,7 @@ object ElasticsearchFlow {
    */
   def createBulk[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       messageWriter: MessageWriter[T]): 
pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
     java.util.List[WriteResult[T, C]], NotUsed] = pekko.stream.scaladsl
     .Flow[java.util.List[WriteMessage[T, C]]]
@@ -148,7 +148,7 @@ object ElasticsearchFlow {
   @ApiMayChange
   def createWithContext[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       objectMapper: ObjectMapper)
       : pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, 
WriteResult[T, C], C, NotUsed] =
     createWithContext(elasticsearchParams, settings, new 
JacksonWriter[T](objectMapper))
@@ -165,7 +165,7 @@ object ElasticsearchFlow {
   @ApiMayChange
   def createWithContext[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       messageWriter: MessageWriter[T])
       : pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, 
WriteResult[T, C], C, NotUsed] =
     scaladsl.ElasticsearchFlow
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
index 70477ebd7..d92d909de 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
@@ -31,7 +31,7 @@ object ElasticsearchSink {
    */
   def create[T](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       objectMapper: ObjectMapper): pekko.stream.javadsl.Sink[WriteMessage[T, 
NotUsed], CompletionStage[Done]] =
     ElasticsearchFlow
       .create(elasticsearchParams, settings, objectMapper)
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
index 4a50a4dfc..849dcd0c3 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
@@ -37,7 +37,7 @@ object ElasticsearchSource {
    */
   def create(elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _]): 
Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
+      settings: SourceSettingsBase[?, ?]): 
Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
     create(elasticsearchParams, query, settings, new ObjectMapper())
 
   /**
@@ -46,7 +46,7 @@ object ElasticsearchSource {
    */
   def create(elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _],
+      settings: SourceSettingsBase[?, ?],
       objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, 
Object]], NotUsed] =
     Source
       .fromMaterializer { (mat: Materializer, _: Attributes) =>
@@ -78,7 +78,7 @@ object ElasticsearchSource {
    */
   def create(elasticsearchParams: ElasticsearchParams,
       searchParams: java.util.Map[String, String],
-      settings: SourceSettingsBase[_, _],
+      settings: SourceSettingsBase[?, ?],
       objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, 
Object]], NotUsed] =
     Source
       .fromMaterializer { (mat: Materializer, _: Attributes) =>
@@ -103,7 +103,7 @@ object ElasticsearchSource {
    */
   def typed[T](elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _],
+      settings: SourceSettingsBase[?, ?],
       clazz: Class[T]): Source[ReadResult[T], NotUsed] =
     typed[T](elasticsearchParams, query, settings, clazz, new ObjectMapper())
 
@@ -113,7 +113,7 @@ object ElasticsearchSource {
    */
   def typed[T](elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _],
+      settings: SourceSettingsBase[?, ?],
       clazz: Class[T],
       objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
     Source
@@ -145,7 +145,7 @@ object ElasticsearchSource {
    */
   def typed[T](elasticsearchParams: ElasticsearchParams,
       searchParams: java.util.Map[String, String],
-      settings: SourceSettingsBase[_, _],
+      settings: SourceSettingsBase[?, ?],
       clazz: Class[T],
       objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
     Source
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
index ac40d4595..b7f17bea5 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
@@ -37,7 +37,7 @@ object ElasticsearchFlow {
    *
    * This factory method requires an implicit Spray JSON writer for `T`.
    */
-  def create[T](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[_, _])(
+  def create[T](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[?, ?])(
       implicit sprayJsonWriter: JsonWriter[T]): Flow[WriteMessage[T, NotUsed], 
WriteResult[T, NotUsed], NotUsed] =
     create[T](elasticsearchParams, settings, new 
SprayJsonWriter[T]()(sprayJsonWriter))
 
@@ -47,7 +47,7 @@ object ElasticsearchFlow {
    * successful execution.
    */
   def create[T](elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]): Flow[WriteMessage[T, NotUsed], WriteResult[T, 
NotUsed], NotUsed] = {
     Flow[WriteMessage[T, NotUsed]]
       .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ 
wm }
@@ -63,7 +63,7 @@ object ElasticsearchFlow {
    *
    * This factory method requires an implicit Spray JSON writer for `T`.
    */
-  def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, 
settings: WriteSettingsBase[_, _])(
+  def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, 
settings: WriteSettingsBase[?, ?])(
       implicit sprayJsonWriter: JsonWriter[T]): Flow[WriteMessage[T, C], 
WriteResult[T, C], NotUsed] =
     createWithPassThrough[T, C](elasticsearchParams, settings, new 
SprayJsonWriter[T]()(sprayJsonWriter))
 
@@ -74,7 +74,7 @@ object ElasticsearchFlow {
    * successful execution.
    */
   def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]): Flow[WriteMessage[T, C], WriteResult[T, C], 
NotUsed] = {
     Flow[WriteMessage[T, C]]
       .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ 
wm }
@@ -91,7 +91,7 @@ object ElasticsearchFlow {
    *
    * This factory method requires an implicit Spray JSON writer for `T`.
    */
-  def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[_, _])(
+  def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[?, ?])(
       implicit sprayJsonWriter: JsonWriter[T])
       : Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, 
C]], NotUsed] =
     createBulk[T, C](elasticsearchParams, settings, new 
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -105,7 +105,7 @@ object ElasticsearchFlow {
    */
   def createBulk[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], 
immutable.Seq[WriteResult[T, C]], NotUsed] = {
     stageFlow(elasticsearchParams, settings, writer)
   }
@@ -119,7 +119,7 @@ object ElasticsearchFlow {
    * This factory method requires an implicit Spray JSON writer for `T`.
    */
   @ApiMayChange
-  def createWithContext[T, C](elasticsearchParams: ElasticsearchParams, 
settings: WriteSettingsBase[_, _])(
+  def createWithContext[T, C](elasticsearchParams: ElasticsearchParams, 
settings: WriteSettingsBase[?, ?])(
       implicit sprayJsonWriter: JsonWriter[T])
       : FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, 
NotUsed] =
     createWithContext[T, C](elasticsearchParams, settings, new 
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -133,7 +133,7 @@ object ElasticsearchFlow {
   @ApiMayChange
   def createWithContext[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]): FlowWithContext[WriteMessage[T, NotUsed], C, 
WriteResult[T, C], C, NotUsed] = {
     Flow[WriteMessage[T, C]]
       .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ 
wm }
@@ -145,7 +145,7 @@ object ElasticsearchFlow {
   @InternalApi
   private def stageFlow[T, C](
       elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], 
immutable.Seq[WriteResult[T, C]], NotUsed] = {
     if (settings.retryLogic == RetryNever) {
       val basicFlow = basicStageFlow[T, C](elasticsearchParams, settings, 
writer)
@@ -208,7 +208,7 @@ object ElasticsearchFlow {
 
   @InternalApi
   private def basicStageFlow[T, C](elasticsearchParams: ElasticsearchParams,
-      settings: WriteSettingsBase[_, _],
+      settings: WriteSettingsBase[?, ?],
       writer: MessageWriter[T]) = {
     Flow
       .fromMaterializer { (mat, _) =>
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
index 9e9eff823..b9ff8ef88 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
@@ -29,7 +29,7 @@ object ElasticsearchSink {
   /**
    * Create a sink to update Elasticsearch with 
[[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing 
type `T`.
    */
-  def create[T](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[_, _])(
+  def create[T](elasticsearchParams: ElasticsearchParams, settings: 
WriteSettingsBase[?, ?])(
       implicit sprayJsonWriter: JsonWriter[T]): Sink[WriteMessage[T, NotUsed], 
Future[Done]] =
     ElasticsearchFlow.create[T](elasticsearchParams, 
settings).toMat(Sink.ignore)(Keep.right)
 
diff --git 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
index 94e23a22c..317df4d6c 100644
--- 
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
+++ 
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
@@ -36,7 +36,7 @@ object ElasticsearchSource {
   def apply(
       elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], 
NotUsed] =
+      settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject], 
NotUsed] =
     create(elasticsearchParams, query, settings)
 
   /**
@@ -50,7 +50,7 @@ object ElasticsearchSource {
    */
   def apply(elasticsearchParams: ElasticsearchParams,
       searchParams: Map[String, String],
-      settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], 
NotUsed] =
+      settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject], 
NotUsed] =
     create(elasticsearchParams, searchParams, settings)
 
   /**
@@ -59,7 +59,7 @@ object ElasticsearchSource {
    */
   def create(elasticsearchParams: ElasticsearchParams,
       query: String,
-      settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], 
NotUsed] =
+      settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject], 
NotUsed] =
     create(elasticsearchParams, Map("query" -> query), settings)
 
   /**
@@ -72,7 +72,7 @@ object ElasticsearchSource {
    */
   def create(elasticsearchParams: ElasticsearchParams,
       searchParams: Map[String, String],
-      settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], 
NotUsed] =
+      settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject], 
NotUsed] =
     Source
       .fromMaterializer { (mat, _) =>
         implicit val system: ActorSystem = mat.system
@@ -93,7 +93,7 @@ object ElasticsearchSource {
    * Creates a [[pekko.stream.scaladsl.Source]] from Elasticsearch that 
streams [[ReadResult]]s of type `T`
    * converted by Spray's [[spray.json.JsonReader]]
    */
-  def typed[T](elasticsearchParams: ElasticsearchParams, query: String, 
settings: SourceSettingsBase[_, _])(
+  def typed[T](elasticsearchParams: ElasticsearchParams, query: String, 
settings: SourceSettingsBase[?, ?])(
       implicit sprayJsonReader: JsonReader[T]): Source[ReadResult[T], NotUsed] 
=
     typed(elasticsearchParams, Map("query" -> query), settings)
 
@@ -107,7 +107,7 @@ object ElasticsearchSource {
    */
   def typed[T](elasticsearchParams: ElasticsearchParams,
       searchParams: Map[String, String],
-      settings: SourceSettingsBase[_, _])(
+      settings: SourceSettingsBase[?, ?])(
       implicit sprayJsonReader: JsonReader[T]): Source[ReadResult[T], NotUsed] 
=
     Source
       .fromMaterializer { (mat, _) =>
diff --git 
a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala 
b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
index 2c67de274..050b2ca17 100644
--- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
+++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
@@ -69,7 +69,7 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with 
ScalaFutures =>
   }
 
   def readTitlesFrom(apiVersion: ApiVersionBase,
-      sourceSettings: SourceSettingsBase[_, _],
+      sourceSettings: SourceSettingsBase[?, ?],
       indexName: String): Future[immutable.Seq[String]] =
     ElasticsearchSource
       .typed[Book](
diff --git 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
index b008abe50..5484fe36f 100644
--- 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
+++ 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
@@ -25,7 +25,7 @@ import pekko.util.ByteString
  */
 @InternalApi private[file] object TarArchiveManager {
 
-  def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, 
NotUsed] = {
+  def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, ?]), ByteString, 
NotUsed] = {
     Flow[(TarArchiveMetadata, Source[ByteString, Any])]
       .flatMapConcat {
         case (metadata, stream) =>
diff --git 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
index d10e14a63..5cce29ffc 100644
--- 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
+++ 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
@@ -52,7 +52,7 @@ object Archive {
   /**
    * Flow for packaging multiple files into one TAR file.
    */
-  def tar(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, 
NotUsed] =
+  def tar(): Flow[(TarArchiveMetadata, Source[ByteString, ?]), ByteString, 
NotUsed] =
     TarArchiveManager.tarFlow()
 
   /**
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
index 216d7a597..e4e3211d0 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
@@ -54,7 +54,7 @@ protected[ftp] trait FtpLike[FtpClient, S <: 
RemoteFileSettings] {
  * INTERNAL API
  */
 @InternalApi
-protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>
+protected[ftp] trait RetrieveOffset { self: FtpLike[?, ?] =>
 
   def retrieveFileInputStream(name: String, handler: Handler, offset: Long): 
Try[InputStream]
 
@@ -64,7 +64,7 @@ protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>
  * INTERNAL API
  */
 @InternalApi
-protected[ftp] trait UnconfirmedReads { self: FtpLike[_, _] =>
+protected[ftp] trait UnconfirmedReads { self: FtpLike[?, ?] =>
 
   def retrieveFileInputStream(name: String, handler: Handler, offset: Long, 
maxUnconfirmedReads: Int): Try[InputStream]
 
diff --git 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
index bc072e325..e8b75160d 100644
--- 
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
+++ 
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
@@ -60,9 +60,9 @@ private[ftp] trait SftpOperations { self: FtpLike[SSHClient, 
SftpSettings] =>
 
           if (credentials.password != "") {
             val passwordAuth: AuthPassword = new AuthPassword(new 
PasswordFinder() {
-              def reqPassword(resource: Resource[_]): Array[Char] = 
credentials.password.toCharArray
+              def reqPassword(resource: Resource[?]): Array[Char] = 
credentials.password.toCharArray
 
-              def shouldRetry(resource: Resource[_]) = false
+              def shouldRetry(resource: Resource[?]) = false
             })
 
             ssh.auth(credentials.username, passwordAuth, keyAuth)
diff --git 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
index acadca341..360a7884d 100644
--- 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
+++ 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
@@ -21,13 +21,13 @@ import org.apache.geode.cache.client.ClientCacheFactory
 final class GeodeSettings private (val hostname: String,
     val port: Int = 10334,
     val configure: Option[ClientCacheFactory => ClientCacheFactory] = None,
-    val pdxCompat: (Class[_], Class[_]) => Boolean = (c1, c2) =>
+    val pdxCompat: (Class[?], Class[?]) => Boolean = (c1, c2) =>
       c1.getSimpleName.equals(c2.getSimpleName)) {
 
   private def copy(hostname: String = hostname,
       port: Int = port,
       configure: Option[ClientCacheFactory => ClientCacheFactory] = configure,
-      pdxCompat: (Class[_], Class[_]) => Boolean = pdxCompat) =
+      pdxCompat: (Class[?], Class[?]) => Boolean = pdxCompat) =
     new GeodeSettings(hostname, port, configure, pdxCompat)
 
   /**
@@ -39,7 +39,7 @@ final class GeodeSettings private (val hostname: String,
   /**
    * @param pdxCompat a function that determines if two class are equivalent 
(java class / scala case class)
    */
-  def withPdxCompat(pdxCompat: (Class[_], Class[_]) => Boolean): GeodeSettings 
= copy(pdxCompat = pdxCompat)
+  def withPdxCompat(pdxCompat: (Class[?], Class[?]) => Boolean): GeodeSettings 
= copy(pdxCompat = pdxCompat)
 
   override def toString: String =
     "GeodeSettings(" +
diff --git 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
index e887a7c6b..c338bb51e 100644
--- 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
+++ 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
@@ -26,10 +26,10 @@ import org.apache.geode.pdx.{ PdxReader, PdxSerializer, 
PdxWriter }
  */
 @InternalApi
 private[geode] class DelegatingPdxSerializer(
-    isPdxCompat: (Class[_], Class[_]) => Boolean) extends PdxSerializer
+    isPdxCompat: (Class[?], Class[?]) => Boolean) extends PdxSerializer
     with Declarable {
 
-  private var serializers = Map[Class[_], PdxSerializer]()
+  private var serializers = Map[Class[?], PdxSerializer]()
 
   def register[V](serializer: PdxSerializer, clazz: Class[V]): Unit = 
synchronized {
     if (!serializers.contains(clazz))
@@ -56,7 +56,7 @@ private[geode] class DelegatingPdxSerializer(
    *
    * @return unmarshalled class or null
    */
-  override def fromData(clazz: Class[_], in: PdxReader): AnyRef =
+  override def fromData(clazz: Class[?], in: PdxReader): AnyRef =
     serializers
       .get(clazz)
       .map(_.fromData(clazz, in))
diff --git 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
index 0d3b822a3..51fa4a53a 100644
--- 
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
+++ 
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
@@ -32,7 +32,7 @@ private[geode] class ShapelessPdxSerializer[A <: AnyRef](enc: 
PdxEncoder[A], dec
     tag.runtimeClass.isInstance(o) &&
     enc.encode(out, o.asInstanceOf[A])
 
-  override def fromData(clazz: Class[_], in: PdxReader): A =
+  override def fromData(clazz: Class[?], in: PdxReader): A =
     dec.decode(in, null) match {
       case Success(e) => e
       case _          => null.asInstanceOf[A]
diff --git a/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala 
b/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
index 3db69b969..17f9628b2 100644
--- a/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
+++ b/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
@@ -32,7 +32,7 @@ object PersonPdxSerializer extends PekkoPdxSerializer[Person] 
{
     } else
       false
 
-  override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
+  override def fromData(clazz: Class[?], in: PdxReader): AnyRef = {
     val id: Int = in.readInt("id")
     val name: String = in.readString("name")
     val birthDate: Date = in.readDate("birthDate")
diff --git 
a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
 
b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
index f1ced54e3..01e6c42e6 100644
--- 
a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
+++ 
b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
  */
 trait ProductSchemas extends ProductSchemasInstances { this: StandardSchemas =>
 
-  protected def extractFieldNames(tag: ClassTag[_]): Array[String] =
+  protected def extractFieldNames(tag: ClassTag[?]): Array[String] =
     ProductSchemasSupport.extractFieldNames(tag)
 
 }
@@ -43,5 +43,5 @@ private[schema] final class ProductSchemaWriter[T <: 
Product](fieldSchemas: Seq[
 }
 
 private object ProductSchemasSupport extends StandardFormats with 
ProductFormats with AdditionalFormats {
-  override def extractFieldNames(tag: ClassTag[_]): Array[String] = 
super.extractFieldNames(tag)
+  override def extractFieldNames(tag: ClassTag[?]): Array[String] = 
super.extractFieldNames(tag)
 }
diff --git 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
index 1e717a69f..8c3b6a789 100644
--- 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
+++ 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
@@ -723,7 +723,7 @@ object GooglePubSub {
       }
       .mapMaterializedValue(flattenCs(_))
 
-  private def flattenCs[T](f: CompletionStage[_ <: CompletionStage[T]]): 
CompletionStage[T] =
+  private def flattenCs[T](f: CompletionStage[? <: CompletionStage[T]]): 
CompletionStage[T] =
     f.thenCompose((t: CompletionStage[T]) => t)
 
   private def publisher(mat: Materializer, attr: Attributes) =
diff --git 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
index b0329bd37..3f00a5a8a 100644
--- 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
+++ 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
@@ -111,7 +111,7 @@ import scala.concurrent.ExecutionContext.parasitic
 
   def putObject(bucket: String,
       objectName: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentType: ContentType): Source[StorageObject, NotUsed] = sourceGCS { 
settings =>
     val uri = Uri(settings.endpointUrl)
       .withPath(Path("/upload" + settings.basePath) ++ getBucketPath(bucket) / 
"o")
diff --git 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
index 883da4ea2..207a816e2 100644
--- 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
+++ 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
@@ -242,7 +242,7 @@ object GCStorage {
    */
   def simpleUpload(bucket: String,
       objectName: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentType: ContentType): Source[StorageObject, NotUsed] =
     GCStorageStream.putObject(bucket, objectName, data.asScala, 
contentType.asInstanceOf[ScalaContentType]).asJava
 
diff --git 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
index 080d7ea24..022842e2d 100644
--- 
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
+++ 
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
@@ -216,7 +216,7 @@ object GCStorage {
    */
   def simpleUpload(bucket: String,
       objectName: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentType: ContentType): Source[StorageObject, NotUsed] =
     GCStorageStream.putObject(bucket, objectName, data, contentType)
 
diff --git 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
index af10b038d..ceecd79e8 100644
--- 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
+++ 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
@@ -827,7 +827,7 @@ abstract class GCStorageWiremockBase(_system: ActorSystem, 
_wireMockServer: Hove
 
 object GCStorageWiremockBase {
 
-  def getCallerName(clazz: Class[_]): String = {
+  def getCallerName(clazz: Class[?]): String = {
     val s = Thread.currentThread.getStackTrace.map(_.getClassName).drop(1)
       .dropWhile(_.matches("(java.lang.Thread|.*WireMockBase.?$)"))
     val reduced = s.lastIndexWhere(_ == clazz.getName) match {
diff --git 
a/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
 
b/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
index 71b1139ed..11f07bfb1 100644
--- 
a/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
+++ 
b/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.{ SequenceFile, Writable }
 
 object HdfsFlow {
 
-  private[hdfs] val OnlyRotationMessage: PartialFunction[OutgoingMessage[_], 
RotationMessage] = {
+  private[hdfs] val OnlyRotationMessage: PartialFunction[OutgoingMessage[?], 
RotationMessage] = {
     case m: RotationMessage => m
   }
 
diff --git 
a/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
 
b/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
index d10e7a9e7..a64a8ed31 100644
--- 
a/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
+++ 
b/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
@@ -47,10 +47,10 @@ private[impl] class PekkoConnectorsResultMapperHelper {
     .appendZoneOrOffsetId
     .toFormatter
 
-  private[impl] def databaseName(point: Class[_]): String =
+  private[impl] def databaseName(point: Class[?]): String =
     point.getAnnotation(classOf[Measurement]).database();
 
-  private[impl] def retentionPolicy(point: Class[_]): String =
+  private[impl] def retentionPolicy(point: Class[?]): String =
     point.getAnnotation(classOf[Measurement]).retentionPolicy();
 
   private[impl] def convertModelToPoint[T](model: T): Point = {
@@ -70,7 +70,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
         val field = colNameAndFieldMap.get(key)
         val column = field.getAnnotation(classOf[Column])
         val columnName: String = column.name()
-        val fieldType: Class[_] = field.getType()
+        val fieldType: Class[?] = field.getType()
 
         val isAccessible = field.isAccessible() // deprecated in JDK 11+
         if (!isAccessible) {
@@ -96,7 +96,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
     }
   }
 
-  private[impl] def cacheClassFields(clazz: Class[_]) =
+  private[impl] def cacheClassFields(clazz: Class[?]) =
     if (!CLASS_FIELD_CACHE.containsKey(clazz.getName)) {
       val initialMap: ConcurrentMap[String, Field] = new ConcurrentHashMap()
       var influxColumnAndFieldMap = 
CLASS_FIELD_CACHE.putIfAbsent(clazz.getName, initialMap)
@@ -125,20 +125,20 @@ private[impl] class PekkoConnectorsResultMapperHelper {
       .toList
   }
 
-  private def measurementName(point: Class[_]): String =
+  private def measurementName(point: Class[?]): String =
     point.getAnnotation(classOf[Measurement]).name();
 
-  private def timeUnit(point: Class[_]): TimeUnit =
+  private def timeUnit(point: Class[?]): TimeUnit =
     point.getAnnotation(classOf[Measurement]).timeUnit()
 
-  private def setTime(pointBuilder: Point.Builder, fieldType: Class[_], 
timeUnit: TimeUnit, value: Any): Unit =
+  private def setTime(pointBuilder: Point.Builder, fieldType: Class[?], 
timeUnit: TimeUnit, value: Any): Unit =
     if (classOf[Instant].isAssignableFrom(fieldType)) {
       val instant = value.asInstanceOf[Instant]
       val time = timeUnit.convert(instant.toEpochMilli, TimeUnit.MILLISECONDS)
       pointBuilder.time(time, timeUnit)
     } else throw new InfluxDBMapperException("Unsupported type " + fieldType + 
" for time: should be of Instant type")
 
-  private def setField(pointBuilder: Point.Builder, fieldType: Class[_], 
columnName: String, value: Any): Unit =
+  private def setField(pointBuilder: Point.Builder, fieldType: Class[?], 
columnName: String, value: Any): Unit =
     if (classOf[java.lang.Boolean].isAssignableFrom(fieldType) || 
classOf[Boolean].isAssignableFrom(fieldType))
       pointBuilder.addField(columnName, value.asInstanceOf[Boolean])
     else if (classOf[java.lang.Long].isAssignableFrom(fieldType) || 
classOf[Long].isAssignableFrom(fieldType))
@@ -150,7 +150,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
     else if (classOf[String].isAssignableFrom(fieldType)) 
pointBuilder.addField(columnName, value.asInstanceOf[String])
     else throw new InfluxDBMapperException("Unsupported type " + fieldType + " 
for column " + columnName)
 
-  private def throwExceptionIfMissingAnnotation(clazz: Class[_]): Unit =
+  private def throwExceptionIfMissingAnnotation(clazz: Class[?]): Unit =
     if (!clazz.isAnnotationPresent(classOf[Measurement]))
       throw new IllegalArgumentException(
         "Class " + clazz.getName + " is not annotated with @" + 
classOf[Measurement].getSimpleName)
@@ -202,7 +202,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
 
   @throws[IllegalArgumentException]
   @throws[IllegalAccessException]
-  private def fieldValueForPrimitivesModified[T](fieldType: Class[_], field: 
Field, obj: T, value: Any): Boolean =
+  private def fieldValueForPrimitivesModified[T](fieldType: Class[?], field: 
Field, obj: T, value: Any): Boolean =
     if (classOf[Double].isAssignableFrom(fieldType)) {
       field.setDouble(obj, value.asInstanceOf[Double].doubleValue)
       true
@@ -221,7 +221,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
 
   @throws[IllegalArgumentException]
   @throws[IllegalAccessException]
-  private def fieldValueForPrimitiveWrappersModified[T](fieldType: Class[_],
+  private def fieldValueForPrimitiveWrappersModified[T](fieldType: Class[?],
       field: Field,
       obj: T,
       value: Any): Boolean =
@@ -243,7 +243,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
 
   @throws[IllegalArgumentException]
   @throws[IllegalAccessException]
-  private def fieldValueModified[T](fieldType: Class[_],
+  private def fieldValueModified[T](fieldType: Class[?],
       field: Field,
       obj: T,
       value: Any,
diff --git 
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
 
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
index e2d07e8f0..5f7cd974d 100644
--- 
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
+++ 
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
@@ -62,7 +62,7 @@ private[ironmq] final class IronMqClient(settings: 
IronMqSettings)(implicit acto
     }
   }
 
-  private val pipeline: Flow[HttpRequest, HttpResponse, _] = Flow[HttpRequest]
+  private val pipeline: Flow[HttpRequest, HttpResponse, ?] = Flow[HttpRequest]
     .map(_.withHeaders(Authorization(GenericHttpCredentials("OAuth", 
settings.token))))
     .map(_ -> NotUsed)
     .via(connectionPoolFlow)
diff --git 
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
 
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
index 06543b6ba..dab27b8e1 100644
--- 
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
+++ 
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
@@ -46,7 +46,7 @@ private[ironmq] class IronMqPushStage(queueName: String, 
settings: IronMqSetting
 
       implicit def ec: ExecutionContext = materializer.executionContext
 
-      override protected val logSource: Class[_] = classOf[IronMqPushStage]
+      override protected val logSource: Class[?] = classOf[IronMqPushStage]
 
       private var runningFutures: Int = 0
       private var exceptionFromUpstream: Option[Throwable] = None
diff --git 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
index 785addfa7..414b9d367 100644
--- 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
+++ 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
@@ -24,13 +24,13 @@ import scala.util.control.NoStackTrace
  */
 trait NonRetriableJmsException extends Exception
 
-case class UnsupportedMessagePropertyType(propertyName: String, propertyValue: 
Any, message: JmsEnvelope[_])
+case class UnsupportedMessagePropertyType(propertyName: String, propertyValue: 
Any, message: JmsEnvelope[?])
     extends Exception(
       s"Jms property '$propertyName' has unknown type 
'${propertyValue.getClass.getName}'. " +
       "Only primitive types and String are supported as property values.")
     with NonRetriableJmsException
 
-case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any, 
message: JmsMapMessagePassThrough[_])
+case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any, 
message: JmsMapMessagePassThrough[?])
     extends Exception(
       s"Jms MapMessage entry '$entryName' has unknown type 
'${entryValue.getClass.getName}'. " +
       "Only primitive types, String, and Byte array are supported as entry 
values.")
diff --git 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
index 8dd2d5688..079780040 100644
--- 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
+++ 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
@@ -28,7 +28,7 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
 
   private val destinationCache = new SoftReferenceCache[Destination, 
jms.Destination]()
 
-  def send(elem: JmsEnvelope[_]): Unit = {
+  def send(elem: JmsEnvelope[?]): Unit = {
     val message: jms.Message = createMessage(elem)
     populateMessageProperties(message, elem)
 
@@ -56,34 +56,34 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
 
   private def lookup(dest: Destination) = destinationCache.lookup(dest, 
dest.create(jmsSession.session))
 
-  private[jakartams] def createMessage(element: JmsEnvelope[_]): jms.Message =
+  private[jakartams] def createMessage(element: JmsEnvelope[?]): jms.Message =
     element match {
 
-      case textMessage: JmsTextMessagePassThrough[_] => 
jmsSession.session.createTextMessage(textMessage.body)
+      case textMessage: JmsTextMessagePassThrough[?] => 
jmsSession.session.createTextMessage(textMessage.body)
 
-      case byteMessage: JmsByteMessagePassThrough[_] =>
+      case byteMessage: JmsByteMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createBytesMessage()
         newMessage.writeBytes(byteMessage.bytes)
         newMessage
 
-      case byteStringMessage: JmsByteStringMessagePassThrough[_] =>
+      case byteStringMessage: JmsByteStringMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createBytesMessage()
         newMessage.writeBytes(byteStringMessage.bytes.toArray)
         newMessage
 
-      case mapMessage: JmsMapMessagePassThrough[_] =>
+      case mapMessage: JmsMapMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createMapMessage()
         populateMapMessage(newMessage, mapMessage)
         newMessage
 
-      case objectMessage: JmsObjectMessagePassThrough[_] =>
+      case objectMessage: JmsObjectMessagePassThrough[?] =>
         jmsSession.session.createObjectMessage(objectMessage.serializable)
 
-      case pt: JmsPassThrough[_] => throw new IllegalArgumentException("can't 
create message for JmsPassThrough")
+      case pt: JmsPassThrough[?] => throw new IllegalArgumentException("can't 
create message for JmsPassThrough")
 
     }
 
-  private[jakartams] def populateMessageProperties(message: 
jakarta.jms.Message, jmsMessage: JmsEnvelope[_]): Unit =
+  private[jakartams] def populateMessageProperties(message: 
jakarta.jms.Message, jmsMessage: JmsEnvelope[?]): Unit =
     jmsMessage.properties.foreach {
       case (key, v) =>
         v match {
@@ -101,7 +101,7 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
         }
     }
 
-  private def populateMapMessage(message: jakarta.jms.MapMessage, jmsMessage: 
JmsMapMessagePassThrough[_]): Unit =
+  private def populateMapMessage(message: jakarta.jms.MapMessage, jmsMessage: 
JmsMapMessagePassThrough[?]): Unit =
     jmsMessage.body.foreach {
       case (key, v) =>
         v match {
diff --git 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
index 675919d5b..2254a1c5d 100644
--- 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
+++ 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
@@ -51,7 +51,7 @@ private[jakartams] object JmsMessageReader {
   def readArray(message: jms.BytesMessage, bufferSize: Int = 4096): 
Array[Byte] =
     readBytes(message, bufferSize).toArray
 
-  private def createMap(keys: java.util.Enumeration[_], accessor: String => 
AnyRef) =
+  private def createMap(keys: java.util.Enumeration[?], accessor: String => 
AnyRef) =
     keys
       .asInstanceOf[java.util.Enumeration[String]]
       .asScala
diff --git 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
index a7b38643d..6719bc70a 100644
--- 
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
+++ 
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
@@ -151,12 +151,12 @@ private[jakartams] final class JmsProducerStage[E <: 
JmsEnvelope[PassThrough], P
           override def onPush(): Unit = {
             val elem: E = grab(in)
             elem match {
-              case _: JmsPassThrough[_] =>
+              case _: JmsPassThrough[?] =>
                 val holder = new Holder[E](NotYetThere)
                 inFlightMessages.enqueue(holder)
                 holder(Success(elem))
                 pushNextIfPossible()
-              case m: JmsEnvelope[_] =>
+              case m: JmsEnvelope[?] =>
                 // create a holder object to capture the in-flight message, 
and enqueue it to preserve message order
                 val holder = new Holder[E](NotYetThere)
                 inFlightMessages.enqueue(holder)
@@ -293,7 +293,7 @@ private[jakartams] object JmsProducerStage {
     override def apply(t: Try[A]): Unit = elem = t
   }
 
-  case class SendAttempt[E <: JmsEnvelope[_]](envelope: E,
+  case class SendAttempt[E <: JmsEnvelope[?]](envelope: E,
       holder: Holder[E],
       attempt: Int = 0,
       backoffMaxed: Boolean = false)
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
index 320830e71..ebd8e65aa 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
@@ -24,13 +24,13 @@ import scala.util.control.NoStackTrace
  */
 trait NonRetriableJmsException extends Exception
 
-case class UnsupportedMessagePropertyType(propertyName: String, propertyValue: 
Any, message: JmsEnvelope[_])
+case class UnsupportedMessagePropertyType(propertyName: String, propertyValue: 
Any, message: JmsEnvelope[?])
     extends Exception(
       s"Jms property '$propertyName' has unknown type 
'${propertyValue.getClass.getName}'. " +
       "Only primitive types and String are supported as property values.")
     with NonRetriableJmsException
 
-case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any, 
message: JmsMapMessagePassThrough[_])
+case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any, 
message: JmsMapMessagePassThrough[?])
     extends Exception(
       s"Jms MapMessage entry '$entryName' has unknown type 
'${entryValue.getClass.getName}'. " +
       "Only primitive types, String, and Byte array are supported as entry 
values.")
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
index f427de436..e4247cabb 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
@@ -28,7 +28,7 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
 
   private val destinationCache = new SoftReferenceCache[Destination, 
jms.Destination]()
 
-  def send(elem: JmsEnvelope[_]): Unit = {
+  def send(elem: JmsEnvelope[?]): Unit = {
     val message: jms.Message = createMessage(elem)
     populateMessageProperties(message, elem)
 
@@ -56,34 +56,34 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
 
   private def lookup(dest: Destination) = destinationCache.lookup(dest, 
dest.create(jmsSession.session))
 
-  private[jms] def createMessage(element: JmsEnvelope[_]): jms.Message =
+  private[jms] def createMessage(element: JmsEnvelope[?]): jms.Message =
     element match {
 
-      case textMessage: JmsTextMessagePassThrough[_] => 
jmsSession.session.createTextMessage(textMessage.body)
+      case textMessage: JmsTextMessagePassThrough[?] => 
jmsSession.session.createTextMessage(textMessage.body)
 
-      case byteMessage: JmsByteMessagePassThrough[_] =>
+      case byteMessage: JmsByteMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createBytesMessage()
         newMessage.writeBytes(byteMessage.bytes)
         newMessage
 
-      case byteStringMessage: JmsByteStringMessagePassThrough[_] =>
+      case byteStringMessage: JmsByteStringMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createBytesMessage()
         newMessage.writeBytes(byteStringMessage.bytes.toArray)
         newMessage
 
-      case mapMessage: JmsMapMessagePassThrough[_] =>
+      case mapMessage: JmsMapMessagePassThrough[?] =>
         val newMessage = jmsSession.session.createMapMessage()
         populateMapMessage(newMessage, mapMessage)
         newMessage
 
-      case objectMessage: JmsObjectMessagePassThrough[_] =>
+      case objectMessage: JmsObjectMessagePassThrough[?] =>
         jmsSession.session.createObjectMessage(objectMessage.serializable)
 
-      case pt: JmsPassThrough[_] => throw new IllegalArgumentException("can't 
create message for JmsPassThrough")
+      case pt: JmsPassThrough[?] => throw new IllegalArgumentException("can't 
create message for JmsPassThrough")
 
     }
 
-  private[jms] def populateMessageProperties(message: javax.jms.Message, 
jmsMessage: JmsEnvelope[_]): Unit =
+  private[jms] def populateMessageProperties(message: javax.jms.Message, 
jmsMessage: JmsEnvelope[?]): Unit =
     jmsMessage.properties.foreach {
       case (key, v) =>
         v match {
@@ -101,7 +101,7 @@ private class JmsMessageProducer(jmsProducer: 
jms.MessageProducer, jmsSession: J
         }
     }
 
-  private def populateMapMessage(message: javax.jms.MapMessage, jmsMessage: 
JmsMapMessagePassThrough[_]): Unit =
+  private def populateMapMessage(message: javax.jms.MapMessage, jmsMessage: 
JmsMapMessagePassThrough[?]): Unit =
     jmsMessage.body.foreach {
       case (key, v) =>
         v match {
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
index e0890c9ec..3c9b9c2e2 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
@@ -52,7 +52,7 @@ private[jms] object JmsMessageReader {
   def readArray(message: jms.BytesMessage, bufferSize: Int = 4096): 
Array[Byte] =
     readBytes(message, bufferSize).toArray
 
-  private def createMap(keys: java.util.Enumeration[_], accessor: String => 
AnyRef) =
+  private def createMap(keys: java.util.Enumeration[?], accessor: String => 
AnyRef) =
     keys
       .asInstanceOf[java.util.Enumeration[String]]
       .asScala
diff --git 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
index aae31aa04..1bf8c61d6 100644
--- 
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
+++ 
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
@@ -150,12 +150,12 @@ private[jms] final class JmsProducerStage[E <: 
JmsEnvelope[PassThrough], PassThr
           override def onPush(): Unit = {
             val elem: E = grab(in)
             elem match {
-              case _: JmsPassThrough[_] =>
+              case _: JmsPassThrough[?] =>
                 val holder = new Holder[E](NotYetThere)
                 inFlightMessages.enqueue(holder)
                 holder(Success(elem))
                 pushNextIfPossible()
-              case m: JmsEnvelope[_] =>
+              case m: JmsEnvelope[?] =>
                 // create a holder object to capture the in-flight message, 
and enqueue it to preserve message order
                 val holder = new Holder[E](NotYetThere)
                 inFlightMessages.enqueue(holder)
@@ -292,7 +292,7 @@ private[jms] object JmsProducerStage {
     override def apply(t: Try[A]): Unit = elem = t
   }
 
-  case class SendAttempt[E <: JmsEnvelope[_]](envelope: E,
+  case class SendAttempt[E <: JmsEnvelope[?]](envelope: E,
       holder: Holder[E],
       attempt: Int = 0,
       backoffMaxed: Boolean = false)
diff --git a/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala 
b/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
index efc2f92e2..d12d1f79f 100644
--- a/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
+++ b/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
@@ -35,7 +35,7 @@ class JsonReaderTest extends AnyWordSpec with Matchers with 
BeforeAndAfterAll wi
   val timeout: FiniteDuration = 3.seconds
 
   // Runs the stream to a sequence sink and returns that sequence
-  def collect[A](source: Source[A, _]): Seq[A] = 
Await.result(source.runWith(Sink.seq), timeout)
+  def collect[A](source: Source[A, ?]): Seq[A] = 
Await.result(source.runWith(Sink.seq), timeout)
 
   // Basic documents + elements to use in tests
   val expectedElements: Seq[String] = Seq("""{"name":"test1"}""", 
"""{"name":"test2"}""", """{"name":"test3"}""")
diff --git 
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
 
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
index 3ea052a69..3e0022905 100644
--- 
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
+++ 
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
@@ -39,7 +39,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
   final case class NewRecord(cr: CommittableRecord) extends Command
   case object Pump extends Command
   case object Complete extends Command
-  final case class SchedulerShutdown(result: Try[_]) extends Command
+  final case class SchedulerShutdown(result: Try[?]) extends Command
 
 }
 
diff --git 
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
 
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
index 95572de9c..5f462413b 100644
--- 
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
+++ 
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
@@ -87,7 +87,7 @@ class MqttPerf {
               Mqtt
                 .serverSessionFlow(serverSession, 
ByteString(connection.remoteAddress.getAddress.getAddress))
                 .join(connection.flow))
-            .wireTap(Sink.foreach[Either[DecodeError, streaming.Event[_]]] {
+            .wireTap(Sink.foreach[Either[DecodeError, streaming.Event[?]]] {
               case Right(streaming.Event(_: streaming.Connect, _)) =>
                 server.offer(streaming.Command(connAck))
               case Right(streaming.Event(s: streaming.Subscribe, _)) =>
diff --git 
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
 
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
index 69599e5bd..a6b5914cf 100644
--- 
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
+++ 
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
@@ -90,7 +90,7 @@ class MqttPerf {
               Mqtt
                 .serverSessionFlow(serverSession, 
ByteString(connection.remoteAddress.getAddress.getAddress))
                 .join(connection.flow))
-            .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+            .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
               case Right(Event(_: Connect, _)) =>
                 server.offer(Command(connAck))
               case Right(Event(s: Subscribe, _)) =>
@@ -117,7 +117,7 @@ class MqttPerf {
         Mqtt
           .clientSessionFlow(clientSession, ByteString("1"))
           .join(Tcp().outgoingConnection(host, port)))
-      .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+      .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
         case Right(Event(_: SubAck, _)) =>
           subscribed.success(Done)
         case Right(Event(p: Publish, _)) =>
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
index 6e819657e..2f977151f 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
@@ -36,7 +36,7 @@ import scala.util.{ Either, Failure, Success }
  */
 @InternalApi private[streaming] object ClientConnector {
 
-  type ConnectData = Option[_]
+  type ConnectData = Option[?]
 
   /*
    * No ACK received - the CONNECT failed
@@ -597,7 +597,7 @@ import scala.util.{ Either, Failure, Success }
  */
 @InternalApi private[streaming] object Subscriber {
 
-  type SubscribeData = Option[_]
+  type SubscribeData = Option[?]
 
   /*
    * No ACK received - the subscription failed
@@ -685,7 +685,7 @@ import scala.util.{ Either, Failure, Success }
    */
   case object UnsubscribeFailed extends Exception with NoStackTrace
 
-  type UnsubscribeData = Option[_]
+  type UnsubscribeData = Option[?]
 
   /*
    * Construct with the starting state
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
index 67a7eb7df..4eefd4028 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
@@ -54,7 +54,7 @@ import scala.util.{ Either, Failure, Success }
  */
 @InternalApi private[streaming] object Producer {
 
-  type PublishData = Option[_]
+  type PublishData = Option[?]
 
   /*
    * Construct with the starting state
@@ -383,7 +383,7 @@ import scala.util.{ Either, Failure, Success }
   sealed abstract class Request[A]
   final case class Register[A](registrant: ActorRef[A], reply: 
Promise[Registered]) extends Request[A]
   private final case class Unregister[A](packetId: PacketId) extends Request[A]
-  final case class Route[A](packetId: PacketId, event: A, failureReply: 
Promise[_]) extends Request[A]
+  final case class Route[A](packetId: PacketId, event: A, failureReply: 
Promise[?]) extends Request[A]
 
   // Replies
 
@@ -422,7 +422,7 @@ import scala.util.{ Either, Failure, Success }
       step(PacketId(after.underlying + 1))
   }
 
-  private[streaming] case class Registration[A](registrant: ActorRef[A], 
failureReplies: Seq[Promise[_]])
+  private[streaming] case class Registration[A](registrant: ActorRef[A], 
failureReplies: Seq[Promise[?]])
 }
 
 /*
@@ -529,12 +529,12 @@ import scala.util.{ Either, Failure, Success }
   final case class RegisterConnection[A](connectionId: ByteString, clientId: 
String) extends Request[A]
   private final case class Unregister[A](clientId: Option[String], packetId: 
PacketId) extends Request[A]
   final case class UnregisterConnection[A](connectionId: ByteString) extends 
Request[A]
-  final case class Route[A](clientId: Option[String], packetId: PacketId, 
event: A, failureReply: Promise[_])
+  final case class Route[A](clientId: Option[String], packetId: PacketId, 
event: A, failureReply: Promise[?])
       extends Request[A]
   final case class RouteViaConnection[A](connectionId: ByteString,
       packetId: PacketId,
       event: A,
-      failureReply: Promise[_])
+      failureReply: Promise[?])
       extends Request[A]
 
   // Replies
@@ -548,7 +548,7 @@ import scala.util.{ Either, Failure, Success }
   def apply[A]: Behavior[Request[A]] =
     new RemotePacketRouter[A].main(Map.empty, Map.empty)
 
-  private[streaming] case class Registration[A](registrant: ActorRef[A], 
failureReplies: Seq[Promise[_]])
+  private[streaming] case class Registration[A](registrant: ActorRef[A], 
failureReplies: Seq[Promise[?]])
 }
 
 /*
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala 
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 0a6385af7..5af82320b 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -1358,7 +1358,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connectReceived.success(Done)
             case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
@@ -1457,7 +1457,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connectReceived.success(Done)
             case Right(Event(cp: Subscribe, _))
@@ -1535,7 +1535,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connectReceived.success(Done)
             case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
@@ -1616,7 +1616,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) => connectReceived.success(Done)
             case _                          =>
           })
@@ -1677,7 +1677,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) => connectReceived.success(Done)
             case _                          =>
           })
@@ -1729,7 +1729,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _))    => connectReceived.success(Done)
             case Right(Event(`disconnect`, _)) => 
disconnectReceived.success(Done)
             case _                             =>
@@ -1797,7 +1797,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, connectionId)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) if 
!firstConnectReceived.isCompleted =>
               firstConnectReceived.success(Done)
             case Right(Event(`connect`, _)) =>
@@ -1919,7 +1919,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(session, ByteString.empty)
               .join(pipeToClient))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connectReceived.success(Done)
             case Right(Event(cp: Publish, _)) if 
cp.flags.contains(ControlPacketFlags.DUP) =>
@@ -2012,7 +2012,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(serverSession, ByteString("connection 1"))
               .join(pipeToClient1))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connect1Received.success(Done)
             case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
@@ -2075,7 +2075,7 @@ class MqttSessionSpec
             Mqtt
               .serverSessionFlow(serverSession, ByteString("connection 2"))
               .join(pipeToClient2))
-          .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+          .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
             case Right(Event(`connect`, _)) =>
               connect2Received.success(Done)
             case Right(Event(cp: Subscribe, _)) if cp.topicFilters == 
subscribe.topicFilters =>
diff --git a/project/CopyrightHeader.scala b/project/CopyrightHeader.scala
index e92b8c9b8..112c7856e 100644
--- a/project/CopyrightHeader.scala
+++ b/project/CopyrightHeader.scala
@@ -23,7 +23,7 @@ trait CopyrightHeader extends AutoPlugin {
 
   override def trigger: PluginTrigger = allRequirements
 
-  protected def headerMappingSettings: Seq[Def.Setting[_]] =
+  protected def headerMappingSettings: Seq[Def.Setting[?]] =
     Seq(Compile, Test).flatMap { config =>
       inConfig(config)(
         Seq(
@@ -35,10 +35,10 @@ trait CopyrightHeader extends AutoPlugin {
             HeaderFileType("template") -> cStyleComment)))
     }
 
-  override def projectSettings: Seq[Def.Setting[_]] =
+  override def projectSettings: Seq[Def.Setting[?]] =
     Def.settings(headerMappingSettings, additional)
 
-  def additional: Seq[Def.Setting[_]] =
+  def additional: Seq[Def.Setting[?]] =
     Def.settings(Compile / compile := {
         (Compile / headerCreate).value
         (Compile / compile).value
diff --git a/project/CopyrightHeaderForBuild.scala 
b/project/CopyrightHeaderForBuild.scala
index d26c4fdc1..54c7d048e 100644
--- a/project/CopyrightHeaderForBuild.scala
+++ b/project/CopyrightHeaderForBuild.scala
@@ -18,7 +18,7 @@ import sbt.{ inConfig, Compile, Def, PluginTrigger, Test, _ }
 object CopyrightHeaderForBuild extends CopyrightHeader {
   override def trigger: PluginTrigger = noTrigger
 
-  override def projectSettings: Seq[Def.Setting[_]] = {
+  override def projectSettings: Seq[Def.Setting[?]] = {
     Seq(Compile, Test).flatMap { config =>
       inConfig(config) {
         Seq(
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
index e3d489738..5e60d202f 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
@@ -241,7 +241,7 @@ import scala.xml.NodeSeq
       .withDefaultHeaders(headers)
 
   def uploadRequest(s3Location: S3Location,
-      payload: Source[ByteString, _],
+      payload: Source[ByteString, ?],
       contentLength: Long,
       contentType: ContentType,
       s3Headers: Seq[HttpHeader])(
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index ce3fc9cf2..3d44561ef 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -654,7 +654,7 @@ import scala.util.{ Failure, Success, Try }
 
   def putObject(s3Location: S3Location,
       contentType: ContentType,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentLength: Long,
       s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] = {
 
@@ -914,7 +914,7 @@ import scala.util.{ Failure, Success, Try }
    */
   def multipartUploadWithContext[C](
       s3Location: S3Location,
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       s3Headers: S3Headers,
       chunkSize: Int = MinChunkSize,
@@ -951,7 +951,7 @@ import scala.util.{ Failure, Success, Try }
       s3Location: S3Location,
       uploadId: String,
       previousParts: immutable.Iterable[Part],
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       s3Headers: S3Headers,
       chunkSize: Int = MinChunkSize,
@@ -1218,7 +1218,7 @@ import scala.util.{ Failure, Success, Try }
       contentType: ContentType,
       s3Headers: S3Headers,
       chunkSize: Int,
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       initialUploadState: Option[(String, Int)] = None)(
       parallelism: Int): Flow[(ByteString, C), UploadPartResponse, NotUsed] = {
 
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
index 24dfe8871..5d0506525 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
@@ -290,7 +290,7 @@ object S3 {
    */
   def putObject(bucket: String,
       key: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentLength: Long,
       contentType: ContentType,
       s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] =
@@ -314,7 +314,7 @@ object S3 {
    */
   def putObject(bucket: String,
       key: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentLength: Long,
       contentType: ContentType): Source[ObjectMetadata, NotUsed] =
     putObject(bucket, key, data, contentLength, contentType, 
S3Headers.empty.withCannedAcl(CannedAcl.Private))
@@ -330,7 +330,7 @@ object S3 {
    */
   def putObject(bucket: String,
       key: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentLength: Long): Source[ObjectMetadata, NotUsed] =
     putObject(bucket, key, data, contentLength, 
ContentTypes.APPLICATION_OCTET_STREAM)
 
@@ -828,7 +828,7 @@ object S3 {
   def multipartUploadWithContext[C](
       bucket: String,
       key: String,
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_],
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?],
       contentType: ContentType,
       s3Headers: S3Headers): Sink[JPair[ByteString, C], 
CompletionStage[MultipartUploadResult]] =
     S3Stream
@@ -870,7 +870,7 @@ object S3 {
   def multipartUploadWithContext[C](
       bucket: String,
       key: String,
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_],
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?],
       contentType: ContentType): Sink[JPair[ByteString, C], 
CompletionStage[MultipartUploadResult]] =
     multipartUploadWithContext[C](bucket,
       key,
@@ -901,7 +901,7 @@ object S3 {
   def multipartUploadWithContext[C](
       bucket: String,
       key: String,
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_])
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?])
       : Sink[JPair[ByteString, C], CompletionStage[MultipartUploadResult]] =
     multipartUploadWithContext[C](bucket, key, chunkUploadSink, 
ContentTypes.APPLICATION_OCTET_STREAM)
 
@@ -1000,7 +1000,7 @@ object S3 {
       key: String,
       uploadId: String,
       previousParts: java.lang.Iterable[Part],
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_],
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?],
       contentType: ContentType,
       s3Headers: S3Headers): Sink[JPair[ByteString, C], 
CompletionStage[MultipartUploadResult]] = {
     S3Stream
@@ -1050,7 +1050,7 @@ object S3 {
       key: String,
       uploadId: String,
       previousParts: java.lang.Iterable[Part],
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_],
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?],
       contentType: ContentType): Sink[JPair[ByteString, C], 
CompletionStage[MultipartUploadResult]] =
     resumeMultipartUploadWithContext[C](bucket,
       key,
@@ -1088,7 +1088,7 @@ object S3 {
       key: String,
       uploadId: String,
       previousParts: java.lang.Iterable[Part],
-      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
_])
+      chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]], 
?])
       : Sink[JPair[ByteString, C], CompletionStage[MultipartUploadResult]] =
     resumeMultipartUploadWithContext[C](bucket,
       key,
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
index 6e1fcb36b..66ba3b4a9 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
@@ -188,7 +188,7 @@ object S3 {
    */
   def putObject(bucket: String,
       key: String,
-      data: Source[ByteString, _],
+      data: Source[ByteString, ?],
       contentLength: Long,
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] =
@@ -542,7 +542,7 @@ object S3 {
   def multipartUploadWithContext[C](
       bucket: String,
       key: String,
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       metaHeaders: MetaHeaders = MetaHeaders(Map()),
       cannedAcl: CannedAcl = CannedAcl.Private,
@@ -587,7 +587,7 @@ object S3 {
   def multipartUploadWithHeadersAndContext[C](
       bucket: String,
       key: String,
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       chunkSize: Int = MinChunkSize,
       chunkingParallelism: Int = 4,
@@ -671,7 +671,7 @@ object S3 {
       key: String,
       uploadId: String,
       previousParts: immutable.Iterable[Part],
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       metaHeaders: MetaHeaders = MetaHeaders(Map()),
       cannedAcl: CannedAcl = CannedAcl.Private,
@@ -758,7 +758,7 @@ object S3 {
       key: String,
       uploadId: String,
       previousParts: immutable.Iterable[Part],
-      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+      chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
       contentType: ContentType = ContentTypes.`application/octet-stream`,
       chunkSize: Int = MinChunkSize,
       chunkingParallelism: Int = 4,
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 14eaa6752..90e6994b2 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -1186,7 +1186,7 @@ trait S3IntegrationSpec
   }
 
   private def uploadAndAndCheckParts(
-      defaultBucket: String, source: Source[ByteString, _], expectedParts: 
Int): Assertion = {
+      defaultBucket: String, source: Source[ByteString, ?], expectedParts: 
Int): Assertion = {
     val metadata =
       for {
         _ <- source.runWith(
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
index e77ff00c5..ccbdfe3a6 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
@@ -834,7 +834,7 @@ abstract class S3WireMockBase(_system: ActorSystem, val 
_wireMockServer: WireMoc
 
 object S3WireMockBase {
 
-  def getCallerName(clazz: Class[_]): String = {
+  def getCallerName(clazz: Class[?]): String = {
     val s = Thread.currentThread.getStackTrace.map(_.getClassName).drop(1)
       .dropWhile(_.matches("(java.lang.Thread|.*WireMockBase.?$)"))
     val reduced = s.lastIndexWhere(_ == clazz.getName) match {
diff --git a/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala 
b/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
index 9fdf031c7..a2805b51b 100644
--- a/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
+++ b/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
@@ -230,7 +230,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
         .runWith(Sink.last)
         .futureValue
 
-      inserted mustBe a[Failure[_]]
+      inserted mustBe a[Failure[?]]
 
       getAllUsersFromDb.futureValue mustBe Set(users.head)
     }
@@ -316,7 +316,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
         .runWith(SlickWithTryResult.sinkTry(insertUser))
         .futureValue
 
-      inserted mustBe a[Failure[_]]
+      inserted mustBe a[Failure[?]]
       getAllUsersFromDb.futureValue mustBe Set(users.head)
     }
 
@@ -328,7 +328,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
         .futureValue
 
       records.length mustBe 41
-      inserted mustBe a[Failure[_]]
+      inserted mustBe a[Failure[?]]
       getAllUsersFromDb.futureValue mustBe users
     }
 
diff --git 
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
 
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
index 53ac2f215..95d6e2676 100644
--- 
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
+++ 
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
@@ -78,7 +78,7 @@ class CharsetCodingFlowsSpec
       // #encoding
       val targetFile = Paths.get("target/outdata.txt")
       val strings = System.getProperties.asScala.map(p => p._1 + " -> " + 
p._2).toList
-      val stringSource: Source[String, _] = Source(strings)
+      val stringSource: Source[String, ?] = Source(strings)
       val result =
         stringSource
           .via(TextFlow.encoding(StandardCharsets.US_ASCII))
@@ -93,7 +93,7 @@ class CharsetCodingFlowsSpec
       import java.nio.charset.StandardCharsets
 
       val utf16bytes = ByteString("äåûßêëé", StandardCharsets.UTF_16)
-      val byteStringSource: Source[ByteString, _] =
+      val byteStringSource: Source[ByteString, ?] =
         Source
           .single(utf16bytes)
 
@@ -114,7 +114,7 @@ class CharsetCodingFlowsSpec
 
       val utf16bytes = ByteString("äåûßêëé", StandardCharsets.UTF_16)
       val targetFile = Paths.get("target/outdata-transcoding.txt")
-      val byteStringSource: Source[ByteString, _] =
+      val byteStringSource: Source[ByteString, ?] =
         Source
           .single(utf16bytes)
       val result: Future[IOResult] =
diff --git 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
index f76048c14..a3b5da2f0 100644
--- 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
+++ 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
@@ -373,7 +373,7 @@ private[unixdomainsocket] abstract class 
UnixDomainSocketImpl(system: ExtendedAc
   private val sel = NativeSelectorProvider.getInstance.openSelector
 
   /** Override to customise reported log source */
-  protected def logSource: Class[_] = getClass
+  protected def logSource: Class[?] = getClass
 
   private val ioThread =
     new Thread(() => nioEventLoop(sel, Logging(system, logSource.getName)), 
"unix-domain-socket-io")
diff --git 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
index 67138e08f..b553e3e11 100644
--- 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
+++ 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
@@ -107,7 +107,7 @@ object UnixDomainSocket extends 
ExtensionId[UnixDomainSocket] with ExtensionIdPr
    */
   override def get(system: ClassicActorSystemProvider): UnixDomainSocket = 
super.apply(system.classicSystem)
 
-  def lookup: ExtensionId[_ <: Extension] =
+  def lookup: ExtensionId[? <: Extension] =
     UnixDomainSocket
 
   def createExtension(system: ExtendedActorSystem): UnixDomainSocket =
diff --git 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
index 5357bdaf5..e240c84e4 100644
--- 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
+++ 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
@@ -42,7 +42,7 @@ object UnixDomainSocket extends ExtensionId[UnixDomainSocket] 
with ExtensionIdPr
   override def createExtension(system: ExtendedActorSystem) =
     new UnixDomainSocket(system)
 
-  override def lookup: ExtensionId[_ <: Extension] =
+  override def lookup: ExtensionId[? <: Extension] =
     UnixDomainSocket
 
   /**
@@ -135,7 +135,7 @@ final class UnixDomainSocket(system: ExtendedActorSystem) 
extends UnixDomainSock
    *                  independently whether the client is still attempting to 
write. This setting is recommended
    *                  for servers, and therefore it is the default setting.
    */
-  def bindAndHandle(handler: Flow[ByteString, ByteString, _],
+  def bindAndHandle(handler: Flow[ByteString, ByteString, ?],
       path: Path,
       backlog: Int = 128,
       halfClose: Boolean = false): Future[ServerBinding] =
diff --git 
a/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala 
b/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
index d5b35a39b..01efc09e5 100644
--- a/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
+++ b/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
@@ -145,7 +145,7 @@ class UnixDomainSocketSpec
     }
 
     "be able to materialize outgoing connection flow more than once" in {
-      def materialize(flow: Flow[ByteString, ByteString, _]): Future[Done] =
+      def materialize(flow: Flow[ByteString, ByteString, ?]): Future[Done] =
         Source.single(ByteString("Hello")).via(flow).runWith(Sink.ignore)
 
       val path = dir.resolve("sock5")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to