This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 3b068ea46 chore: Rewrite to scala3 syntax (#1675)
3b068ea46 is described below
commit 3b068ea46661783f9a443116ca07a063893934d4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Jun 14 20:43:29 2026 +0800
chore: Rewrite to scala3 syntax (#1675)
Motivation:
Let Scala 3 Community build compile pekko-connectors.
Modification:
Update .scalafmt.conf dialect to scala213source3 and enable
scala3 syntax rewrite rules, then reformat all sources.
Result:
Wildcard types use ? syntax (e.g. Class[?] instead of Class[_]).
Tests:
Not run - formatting only
References:
Refs apache/pekko#3048
---
.scalafmt.conf | 12 ++++++++++-
.../docs/scaladsl/AbstractAvroParquet.scala | 11 +++++-----
.../connectors/awsspi/PekkoHttpClientSpec.scala | 6 +++---
.../connectors/cassandra/CqlSessionProvider.scala | 2 +-
.../cassandra/javadsl/CassandraSession.scala | 10 ++++-----
.../cassandra/javadsl/CassandraSource.scala | 4 ++--
.../cassandra/scaladsl/CassandraSession.scala | 12 +++++------
.../cassandra/scaladsl/CassandraSource.scala | 4 ++--
.../cassandra/scaladsl/CassandraLifecycle.scala | 4 ++--
.../couchbase/impl/CouchbaseSessionImpl.scala | 16 +++++++--------
.../impl/CouchbaseSessionJavaAdapter.scala | 16 +++++++--------
.../couchbase/javadsl/CouchbaseFlow.scala | 10 ++++-----
.../couchbase/javadsl/CouchbaseSession.scala | 16 +++++++--------
.../couchbase/javadsl/CouchbaseSink.scala | 4 ++--
.../pekko/stream/connectors/couchbase/model.scala | 6 +++---
.../couchbase/scaladsl/CouchbaseFlow.scala | 10 ++++-----
.../couchbase/scaladsl/CouchbaseSession.scala | 16 +++++++--------
.../couchbase/scaladsl/CouchbaseSink.scala | 2 +-
.../connectors/dynamodb/javadsl/DynamoDb.scala | 2 +-
.../connectors/dynamodb/scaladsl/DynamoDb.scala | 4 ++--
.../connectors/elasticsearch/ReadResult.scala | 2 +-
.../connectors/elasticsearch/WriteMessage.scala | 4 ++--
.../impl/ElasticsearchSimpleFlowStage.scala | 2 +-
.../impl/ElasticsearchSourceStage.scala | 4 ++--
.../elasticsearch/javadsl/ElasticsearchFlow.scala | 16 +++++++--------
.../elasticsearch/javadsl/ElasticsearchSink.scala | 2 +-
.../javadsl/ElasticsearchSource.scala | 12 +++++------
.../elasticsearch/scaladsl/ElasticsearchFlow.scala | 20 +++++++++---------
.../elasticsearch/scaladsl/ElasticsearchSink.scala | 2 +-
.../scaladsl/ElasticsearchSource.scala | 12 +++++------
.../docs/scaladsl/ElasticsearchSpecUtils.scala | 2 +-
.../file/impl/archive/TarArchiveManager.scala | 2 +-
.../stream/connectors/file/scaladsl/Archive.scala | 2 +-
.../pekko/stream/connectors/ftp/impl/FtpLike.scala | 4 ++--
.../connectors/ftp/impl/SftpOperations.scala | 4 ++--
.../stream/connectors/geode/GeodeSettings.scala | 6 +++---
.../geode/impl/pdx/DelegatingPdxSerializer.scala | 6 +++---
.../geode/impl/pdx/ShapelessPdxSerializer.scala | 2 +-
.../scala/docs/scaladsl/PersonPdxSerializer.scala | 2 +-
.../bigquery/scaladsl/schema/ProductSchemas.scala | 4 ++--
.../pubsub/grpc/javadsl/GooglePubSub.scala | 2 +-
.../googlecloud/storage/impl/GCStorageStream.scala | 2 +-
.../googlecloud/storage/javadsl/GCStorage.scala | 2 +-
.../googlecloud/storage/scaladsl/GCStorage.scala | 2 +-
.../storage/scaladsl/GCStorageWiremockBase.scala | 2 +-
.../stream/connectors/hdfs/scaladsl/HdfsFlow.scala | 2 +-
.../impl/PekkoConnectorsResultMapperHelper.scala | 24 +++++++++++-----------
.../connectors/ironmq/impl/IronMqClient.scala | 2 +-
.../connectors/ironmq/impl/IronMqPushStage.scala | 2 +-
.../connectors/jakartams/JmsExceptions.scala | 4 ++--
.../jakartams/impl/JmsMessageProducer.scala | 20 +++++++++---------
.../jakartams/impl/JmsMessageReader.scala | 2 +-
.../jakartams/impl/JmsProducerStage.scala | 6 +++---
.../stream/connectors/jms/JmsExceptions.scala | 4 ++--
.../connectors/jms/impl/JmsMessageProducer.scala | 20 +++++++++---------
.../connectors/jms/impl/JmsMessageReader.scala | 2 +-
.../connectors/jms/impl/JmsProducerStage.scala | 6 +++---
.../test/scala/docs/scaladsl/JsonReaderTest.scala | 2 +-
.../kinesis/impl/KinesisSchedulerSourceStage.scala | 2 +-
.../pekko/stream/connectors/mqtt/MqttPerf.scala | 2 +-
.../connectors/mqtt/streaming/MqttPerf.scala | 4 ++--
.../mqtt/streaming/impl/ClientState.scala | 6 +++---
.../mqtt/streaming/impl/RequestState.scala | 12 +++++------
.../test/scala/docs/scaladsl/MqttSessionSpec.scala | 20 +++++++++---------
project/CopyrightHeader.scala | 6 +++---
project/CopyrightHeaderForBuild.scala | 2 +-
.../stream/connectors/s3/impl/HttpRequests.scala | 2 +-
.../pekko/stream/connectors/s3/impl/S3Stream.scala | 8 ++++----
.../pekko/stream/connectors/s3/javadsl/S3.scala | 18 ++++++++--------
.../pekko/stream/connectors/s3/scaladsl/S3.scala | 10 ++++-----
.../connectors/s3/scaladsl/S3IntegrationSpec.scala | 2 +-
.../connectors/s3/scaladsl/S3WireMockBase.scala | 2 +-
.../docs/scaladsl/SlickWithTryResultSpec.scala | 6 +++---
.../text/scaladsl/CharsetCodingFlowsSpec.scala | 6 +++---
.../impl/UnixDomainSocketImpl.scala | 2 +-
.../javadsl/UnixDomainSocket.scala | 2 +-
.../scaladsl/UnixDomainSocket.scala | 4 ++--
.../scala/docs/scaladsl/UnixDomainSocketSpec.scala | 2 +-
78 files changed, 256 insertions(+), 245 deletions(-)
diff --git a/.scalafmt.conf b/.scalafmt.conf
index 2f63f8c75..e3bb547ae 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,5 +1,5 @@
version = 3.11.1
-runner.dialect = scala213
+runner.dialect = scala213source3
project.git = true
style = defaultWithAlign
docstrings.style = Asterisk
@@ -75,3 +75,13 @@ project.excludeFilters = [
"scripts/authors.scala"
]
project.layout = StandardConvention
+
+rewrite.scala3.convertToNewSyntax = true
+runner {
+ dialectOverride {
+ allowSignificantIndentation = false
+ allowAsForImportRename = false
+ allowStarWildcardImport = false
+ allowPostfixStarVarargSplices = false
+ }
+}
diff --git
a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
index f3358adb1..49093f18a 100644
--- a/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
+++ b/avroparquet/src/test/scala-3/docs/scaladsl/AbstractAvroParquet.scala
@@ -13,7 +13,7 @@
package docs.scaladsl
-import com.sksamuel.avro4s._
+import com.sksamuel.avro4s.*
import org.apache.pekko.testkit.TestKit
import org.scalatest.{ BeforeAndAfterAll, Suite }
@@ -32,10 +32,11 @@ trait AbstractAvroParquet extends BeforeAndAfterAll with
AbstractAvroParquetBase
}
private def deleteRecursively(f: File): Boolean = {
- if (f.isDirectory) f.listFiles match {
- case null =>
- case xs => xs.foreach(deleteRecursively)
- }
+ if f.isDirectory then
+ f.listFiles match {
+ case null =>
+ case xs => xs.foreach(deleteRecursively)
+ }
f.delete()
}
}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
index b9c35672e..1811285f4 100644
---
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -71,7 +71,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers
with OptionValues {
"use sdk content length from headers when publisher returns empty
contentLength" in {
val publisher = new SdkHttpContentPublisher {
override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.empty()
- override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
}
val entity =
PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher,
@@ -82,7 +82,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers
with OptionValues {
"use publisher contentLength when sdkContentLength is absent" in {
val publisher = new SdkHttpContentPublisher {
override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.of(99L)
- override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
}
val entity =
PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher, None)
@@ -92,7 +92,7 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers
with OptionValues {
"prefer sdk content length over publisher contentLength when both are
present" in {
val publisher = new SdkHttpContentPublisher {
override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.of(55L)
- override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ override def subscribe(s: Subscriber[? >: ByteBuffer]): Unit = {}
}
val entity =
PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher,
diff --git
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
index 535756340..e4dabdacc 100644
---
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala
@@ -77,7 +77,7 @@ object CqlSessionProvider {
val className = config.getString("session-provider")
val dynamicAccess = system.asInstanceOf[ExtendedActorSystem].dynamicAccess
val clazz = dynamicAccess.getClassFor[CqlSessionProvider](className).get
- def instantiate(args: immutable.Seq[(Class[_], AnyRef)]) =
+ def instantiate(args: immutable.Seq[(Class[?], AnyRef)]) =
dynamicAccess.createInstanceFor[CqlSessionProvider](clazz, args)
val params = List((classOf[ActorSystem], system), (classOf[Config],
config))
diff --git
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
index dde41c43a..50665acf4 100644
---
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSession.scala
@@ -151,7 +151,7 @@ final class CassandraSession(@InternalApi private[pekko]
val delegate: scaladsl.
* The returned `CompletionStage` is completed when the statement has been
* successfully executed, or if it fails.
*/
- def executeWrite(stmt: Statement[_]): CompletionStage[Done] =
+ def executeWrite(stmt: Statement[?]): CompletionStage[Done] =
delegate.executeWrite(stmt).asJava
/**
@@ -180,7 +180,7 @@ final class CassandraSession(@InternalApi private[pekko]
val delegate: scaladsl.
* Note that you have to connect a `Sink` that consumes the messages from
* this `Source` and then `run` the stream.
*/
- def select(stmt: Statement[_]): Source[Row, NotUsed] =
+ def select(stmt: Statement[?]): Source[Row, NotUsed] =
delegate.select(stmt).asJava
/**
@@ -195,7 +195,7 @@ final class CassandraSession(@InternalApi private[pekko]
val delegate: scaladsl.
* Note that you have to connect a `Sink` that consumes the messages from
* this `Source` and then `run` the stream.
*/
- def select(stmt: CompletionStage[Statement[_]]): Source[Row, NotUsed] =
+ def select(stmt: CompletionStage[Statement[?]]): Source[Row, NotUsed] =
delegate.select(stmt.asScala).asJava
/**
@@ -223,7 +223,7 @@ final class CassandraSession(@InternalApi private[pekko]
val delegate: scaladsl.
*
* The returned `CompletionStage` is completed with the found rows.
*/
- def selectAll(stmt: Statement[_]): CompletionStage[JList[Row]] =
+ def selectAll(stmt: Statement[?]): CompletionStage[JList[Row]] =
delegate.selectAll(stmt).map(_.asJava).asJava
/**
@@ -249,7 +249,7 @@ final class CassandraSession(@InternalApi private[pekko]
val delegate: scaladsl.
* The returned `CompletionStage` is completed with the first row,
* if any.
*/
- def selectOne(stmt: Statement[_]): CompletionStage[Optional[Row]] =
+ def selectOne(stmt: Statement[?]): CompletionStage[Optional[Row]] =
delegate.selectOne(stmt).map(_.toJava).asJava
/**
diff --git
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
index 00a90b339..8d8a2577c 100644
---
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSource.scala
@@ -41,7 +41,7 @@ object CassandraSource {
*
* See <a
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html">Querying
data</a>.
*/
- def create(session: CassandraSession, stmt: Statement[_]): Source[Row,
NotUsed] =
+ def create(session: CassandraSession, stmt: Statement[?]): Source[Row,
NotUsed] =
session.select(stmt)
/**
@@ -49,7 +49,7 @@ object CassandraSource {
*
* See <a
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html">Querying
data</a>.
*/
- def fromCompletionStage(session: CassandraSession, stmt:
CompletionStage[Statement[_]]): Source[Row, NotUsed] =
+ def fromCompletionStage(session: CassandraSession, stmt:
CompletionStage[Statement[?]]): Source[Row, NotUsed] =
session.select(stmt)
}
diff --git
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
index e5aff3813..c2bba9034 100644
---
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala
@@ -171,7 +171,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
* The returned `Future` is completed when the statement has been
* successfully executed, or if it fails.
*/
- def executeWrite(stmt: Statement[_]): Future[Done] = {
+ def executeWrite(stmt: Statement[?]): Future[Done] = {
underlying().flatMap { cqlSession =>
cqlSession.executeAsync(stmt).asScala.map(_ => Done)
}
@@ -194,7 +194,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
/**
* INTERNAL API
*/
- @InternalApi private[pekko] def selectResultSet(stmt: Statement[_]):
Future[AsyncResultSet] = {
+ @InternalApi private[pekko] def selectResultSet(stmt: Statement[?]):
Future[AsyncResultSet] = {
underlying().flatMap { s =>
s.executeAsync(stmt).asScala
}
@@ -212,7 +212,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
* Note that you have to connect a `Sink` that consumes the messages from
* this `Source` and then `run` the stream.
*/
- def select(stmt: Statement[_]): Source[Row, NotUsed] = {
+ def select(stmt: Statement[?]): Source[Row, NotUsed] = {
Source
.futureSource {
underlying().map { cqlSession =>
@@ -233,7 +233,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
* Note that you have to connect a `Sink` that consumes the messages from
* this `Source` and then `run` the stream.
*/
- def select(stmt: Future[Statement[_]]): Source[Row, NotUsed] = {
+ def select(stmt: Future[Statement[?]]): Source[Row, NotUsed] = {
Source
.futureSource {
underlying().flatMap(cqlSession => stmt.map(cqlSession -> _)).map {
@@ -269,7 +269,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
*
* The returned `Future` is completed with the found rows.
*/
- def selectAll(stmt: Statement[_]): Future[immutable.Seq[Row]] = {
+ def selectAll(stmt: Statement[?]): Future[immutable.Seq[Row]] = {
select(stmt)
.runWith(Sink.seq)
}
@@ -297,7 +297,7 @@ final class CassandraSession(system:
pekko.actor.ActorSystem,
* The returned `Future` is completed with the first row,
* if any.
*/
- def selectOne(stmt: Statement[_]): Future[Option[Row]] = {
+ def selectOne(stmt: Statement[?]): Future[Option[Row]] = {
selectResultSet(stmt).map { rs =>
Option(rs.one()) // rs.one returns null if exhausted
}
diff --git
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
index 107ff77ed..3eff63cdc 100644
---
a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSource.scala
@@ -38,7 +38,7 @@ object CassandraSource {
*
* See <a
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html">Querying
data</a>.
*/
- def apply(stmt: Statement[_])(implicit session: CassandraSession):
Source[Row, NotUsed] =
+ def apply(stmt: Statement[?])(implicit session: CassandraSession):
Source[Row, NotUsed] =
session.select(stmt)
/**
@@ -46,7 +46,7 @@ object CassandraSource {
*
* See <a
href="https://docs.datastax.com/en/dse/6.7/cql/cql/cql_using/queriesTOC.html">Querying
data</a>.
*/
- def fromFuture(stmt: Future[Statement[_]])(implicit session:
CassandraSession): Source[Row, NotUsed] =
+ def fromFuture(stmt: Future[Statement[?]])(implicit session:
CassandraSession): Source[Row, NotUsed] =
session.select(stmt)
}
diff --git
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
index fb4e7e40b..7f05f8cec 100644
---
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
+++
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala
@@ -32,7 +32,7 @@ import scala.util.control.NonFatal
trait CassandraLifecycleBase {
def lifecycleSession: CassandraSession
- def execute(session: CassandraSession, statements:
immutable.Seq[BatchableStatement[_]]): Future[Done] = {
+ def execute(session: CassandraSession, statements:
immutable.Seq[BatchableStatement[?]]): Future[Done] = {
val batch = new BatchStatementBuilder(BatchType.LOGGED)
statements.foreach { stmt =>
batch.addStatement(stmt)
@@ -62,7 +62,7 @@ trait CassandraLifecycleBase {
def dropKeyspace(name: String): Future[Done] =
withSchemaMetadataDisabled(dropKeyspace(lifecycleSession, name))
- def execute(statements: immutable.Seq[BatchableStatement[_]]): Future[Done]
= execute(lifecycleSession, statements)
+ def execute(statements: immutable.Seq[BatchableStatement[?]]): Future[Done]
= execute(lifecycleSession, statements)
def executeCql(statements: immutable.Seq[String]): Future[Done] =
executeCql(lifecycleSession, statements)
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
index 5b6481e04..5d00543d4 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala
@@ -48,13 +48,13 @@ final private[couchbase] class
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
def insert(document: JsonDocument): Future[JsonDocument] =
insertDoc(document)
- def insertDoc[T <: Document[_]](document: T): Future[T] =
+ def insertDoc[T <: Document[?]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.insert(document), document)
def insert(document: JsonDocument, writeSettings: CouchbaseWriteSettings):
Future[JsonDocument] =
insertDoc(document, writeSettings)
- def insertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
+ def insertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.insert(document,
writeSettings.persistTo,
writeSettings.replicateTo,
@@ -65,26 +65,26 @@ final private[couchbase] class
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
def get(id: String): Future[Option[JsonDocument]] =
zeroOrOneObservableToFuture(asyncBucket.get(id))
- def get[T <: Document[_]](id: String, documentClass: Class[T]):
Future[Option[T]] =
+ def get[T <: Document[?]](id: String, documentClass: Class[T]):
Future[Option[T]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass))
def get(id: String, timeout: FiniteDuration): Future[Option[JsonDocument]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, timeout.toMillis,
TimeUnit.MILLISECONDS))
- def get[T <: Document[_]](id: String,
+ def get[T <: Document[?]](id: String,
timeout: FiniteDuration,
documentClass: Class[T]): scala.concurrent.Future[Option[T]] =
zeroOrOneObservableToFuture(asyncBucket.get(id, documentClass,
timeout.toMillis, TimeUnit.MILLISECONDS))
def upsert(document: JsonDocument): Future[JsonDocument] =
upsertDoc(document)
- def upsertDoc[T <: Document[_]](document: T): Future[T] =
+ def upsertDoc[T <: Document[?]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.upsert(document), document.id)
def upsert(document: JsonDocument, writeSettings: CouchbaseWriteSettings):
Future[JsonDocument] =
upsertDoc(document, writeSettings)
- def upsertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
+ def upsertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.upsert(document,
writeSettings.persistTo,
writeSettings.replicateTo,
@@ -94,13 +94,13 @@ final private[couchbase] class
CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl
def replace(document: JsonDocument): Future[JsonDocument] =
replaceDoc(document)
- def replaceDoc[T <: Document[_]](document: T): Future[T] =
+ def replaceDoc[T <: Document[?]](document: T): Future[T] =
singleObservableToFuture(asyncBucket.replace(document), document.id)
def replace(document: JsonDocument, writeSettings: CouchbaseWriteSettings):
Future[JsonDocument] =
replaceDoc(document, writeSettings)
- def replaceDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
+ def replaceDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T] =
singleObservableToFuture(asyncBucket.replace(document,
writeSettings.persistTo,
writeSettings.replicateTo,
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
index 52d0b19e6..77b62ce62 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionJavaAdapter.scala
@@ -49,47 +49,47 @@ private[couchbase] final class
CouchbaseSessionJavaAdapter(delegate: scaladsl.Co
override def insert(document: JsonDocument): CompletionStage[JsonDocument] =
delegate.insertDoc(document).asJava
- override def insertDoc[T <: Document[_]](document: T): CompletionStage[T] =
delegate.insertDoc(document).asJava
+ override def insertDoc[T <: Document[?]](document: T): CompletionStage[T] =
delegate.insertDoc(document).asJava
override def insert(
document: JsonDocument,
writeSettings: CouchbaseWriteSettings): CompletionStage[JsonDocument] =
delegate.insert(document, writeSettings).asJava
- override def insertDoc[T <: Document[_]](
+ override def insertDoc[T <: Document[?]](
document: T,
writeSettings: CouchbaseWriteSettings): CompletionStage[T] =
delegate.insertDoc(document, writeSettings).asJava
override def get(id: String): CompletionStage[Optional[JsonDocument]] =
futureOptToJava(delegate.get(id))
- override def get[T <: Document[_]](id: String, clazz: Class[T]):
CompletionStage[Optional[T]] =
+ override def get[T <: Document[?]](id: String, clazz: Class[T]):
CompletionStage[Optional[T]] =
futureOptToJava(delegate.get(id, clazz))
override def get(id: String, timeout: Duration):
CompletionStage[Optional[JsonDocument]] =
futureOptToJava(delegate.get(id, FiniteDuration.apply(timeout.toNanos,
duration.NANOSECONDS)))
- def get[T <: Document[_]](id: String, timeout: Duration, documentClass:
Class[T]): CompletionStage[Optional[T]] =
+ def get[T <: Document[?]](id: String, timeout: Duration, documentClass:
Class[T]): CompletionStage[Optional[T]] =
futureOptToJava(delegate.get(id, FiniteDuration.apply(timeout.toNanos,
duration.NANOSECONDS), documentClass))
override def upsert(document: JsonDocument): CompletionStage[JsonDocument] =
delegate.upsert(document).asJava
- override def upsertDoc[T <: Document[_]](document: T): CompletionStage[T] =
delegate.upsertDoc(document).asJava
+ override def upsertDoc[T <: Document[?]](document: T): CompletionStage[T] =
delegate.upsertDoc(document).asJava
override def upsert(document: JsonDocument, writeSettings:
CouchbaseWriteSettings): CompletionStage[JsonDocument] =
delegate.upsert(document, writeSettings).asJava
- override def upsertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T] =
+ override def upsertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T] =
delegate.upsertDoc(document, writeSettings).asJava
override def replace(document: JsonDocument): CompletionStage[JsonDocument]
= delegate.replace(document).asJava
- override def replaceDoc[T <: Document[_]](document: T): CompletionStage[T] =
delegate.replaceDoc(document).asJava
+ override def replaceDoc[T <: Document[?]](document: T): CompletionStage[T] =
delegate.replaceDoc(document).asJava
override def replace(document: JsonDocument, writeSettings:
CouchbaseWriteSettings): CompletionStage[JsonDocument] =
delegate.replace(document, writeSettings).asJava
- override def replaceDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T] =
+ override def replaceDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T] =
delegate.replaceDoc(document, writeSettings).asJava
override def remove(id: String): CompletionStage[Done] =
delegate.remove(id).asJava
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
index aa267cdab..f5ffc1a6b 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseFlow.scala
@@ -39,7 +39,7 @@ object CouchbaseFlow {
/**
* Create a flow to query Couchbase for by `id` and emit documents of the
given class.
*/
- def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def fromId[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
bucketName: String,
target: Class[T]): Flow[String, T, NotUsed] =
scaladsl.CouchbaseFlow.fromId(sessionSettings, bucketName, target).asJava
@@ -55,7 +55,7 @@ object CouchbaseFlow {
/**
* Create a flow to update or insert a Couchbase document of the given class.
*/
- def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
scaladsl.CouchbaseFlow.upsertDoc(sessionSettings, writeSettings,
bucketName).asJava
@@ -64,7 +64,7 @@ object CouchbaseFlow {
* Create a flow to update or insert a Couchbase document of the given class
and emit a result so that write failures
* can be handled in-stream.
*/
- def upsertDocWithResult[T <: Document[_]](sessionSettings:
CouchbaseSessionSettings,
+ def upsertDocWithResult[T <: Document[?]](sessionSettings:
CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
scaladsl.CouchbaseFlow.upsertDocWithResult(sessionSettings, writeSettings,
bucketName).asJava
@@ -80,7 +80,7 @@ object CouchbaseFlow {
/**
* Create a flow to replace a Couchbase document of the given class.
*/
- def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
scaladsl.CouchbaseFlow.replaceDoc(sessionSettings, writeSettings,
bucketName).asJava
@@ -89,7 +89,7 @@ object CouchbaseFlow {
* Create a flow to replace a Couchbase document of the given class and emit
a result so that write failures
* can be handled in-stream.
*/
- def replaceDocWithResult[T <: Document[_]](sessionSettings:
CouchbaseSessionSettings,
+ def replaceDocWithResult[T <: Document[?]](sessionSettings:
CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
scaladsl.CouchbaseFlow.replaceDocWithResult(sessionSettings,
writeSettings, bucketName).asJava
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
index a2ec5d80c..b07a23884 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSession.scala
@@ -113,7 +113,7 @@ abstract class CouchbaseSession {
*
* @return A CompletionStage that completes with the written document when
the write completes
*/
- def insertDoc[T <: Document[_]](document: T): CompletionStage[T]
+ def insertDoc[T <: Document[?]](document: T): CompletionStage[T]
/**
* Insert a JSON document using the given write settings.
@@ -126,7 +126,7 @@ abstract class CouchbaseSession {
* Insert a document using the given write settings.
* Separate from `insert` to make the most common case smoother with the
type inference
*/
- def insertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
+ def insertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
/**
* @return A document if found or none if there is no document for the id
@@ -136,7 +136,7 @@ abstract class CouchbaseSession {
/**
* @return A document if found or none if there is no document for the id
*/
- def get[T <: Document[_]](id: String, documentClass: Class[T]):
CompletionStage[Optional[T]]
+ def get[T <: Document[?]](id: String, documentClass: Class[T]):
CompletionStage[Optional[T]]
/**
* @param timeout fail the returned CompletionStage with a TimeoutException
if it takes longer than this
@@ -148,7 +148,7 @@ abstract class CouchbaseSession {
* @param timeout fail the returned CompletionStage with a TimeoutException
if it takes longer than this
* @return A document if found or none if there is no document for the id
*/
- def get[T <: Document[_]](id: String, timeout: Duration, documentClass:
Class[T]): CompletionStage[Optional[T]]
+ def get[T <: Document[?]](id: String, timeout: Duration, documentClass:
Class[T]): CompletionStage[Optional[T]]
/**
* Upsert using the default write settings
@@ -165,7 +165,7 @@ abstract class CouchbaseSession {
*
* @return a CompletionStage that completes when the upsert is done
*/
- def upsertDoc[T <: Document[_]](document: T): CompletionStage[T]
+ def upsertDoc[T <: Document[?]](document: T): CompletionStage[T]
/**
* Upsert using the given write settings.
@@ -183,7 +183,7 @@ abstract class CouchbaseSession {
*
* @return a CompletionStage that completes when the upsert is done
*/
- def upsertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
+ def upsertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
/**
* Replace using the default write settings
@@ -200,7 +200,7 @@ abstract class CouchbaseSession {
*
* @return a CompletionStage that completes when the replace is done
*/
- def replaceDoc[T <: Document[_]](document: T): CompletionStage[T]
+ def replaceDoc[T <: Document[?]](document: T): CompletionStage[T]
/**
* Replace using the given write settings.
@@ -218,7 +218,7 @@ abstract class CouchbaseSession {
*
* @return a CompletionStage that completes when the replace is done
*/
- def replaceDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
+ def replaceDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): CompletionStage[T]
/**
* Remove a document by id using the default write settings.
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
index 7b4d385d0..3eabab6df 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/javadsl/CouchbaseSink.scala
@@ -39,7 +39,7 @@ object CouchbaseSink {
/**
* Create a sink to update or insert a Couchbase document of the given class.
*/
- def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Sink[T, CompletionStage[Done]] =
CouchbaseFlow
@@ -59,7 +59,7 @@ object CouchbaseSink {
/**
* Create a sink to replace a Couchbase document of the given class.
*/
- def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Sink[T, CompletionStage[Done]] =
CouchbaseFlow
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
index cc0c2de48..af17b70f8 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala
@@ -261,7 +261,7 @@ final class CouchbaseSessionSettings private (
/**
* Wrapper to for handling Couchbase write failures in-stream instead of
failing the stream.
*/
-sealed trait CouchbaseWriteResult[T <: Document[_]] {
+sealed trait CouchbaseWriteResult[T <: Document[?]] {
def isSuccess: Boolean
def isFailure: Boolean
def doc: T
@@ -270,7 +270,7 @@ sealed trait CouchbaseWriteResult[T <: Document[_]] {
/**
* Emitted for a successful Couchbase write operation.
*/
-final case class CouchbaseWriteSuccess[T <: Document[_]] private[couchbase] (
+final case class CouchbaseWriteSuccess[T <: Document[?]] private[couchbase] (
override val doc: T) extends CouchbaseWriteResult[T] {
val isSuccess: Boolean = true
val isFailure: Boolean = false
@@ -279,7 +279,7 @@ final case class CouchbaseWriteSuccess[T <: Document[_]]
private[couchbase] (
/**
* Emitted for a failed Couchbase write operation.
*/
-final case class CouchbaseWriteFailure[T <: Document[_]] private[couchbase]
(override val doc: T, failure: Throwable)
+final case class CouchbaseWriteFailure[T <: Document[?]] private[couchbase]
(override val doc: T, failure: Throwable)
extends CouchbaseWriteResult[T] {
val isSuccess: Boolean = false
val isFailure: Boolean = true
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
index ad35b1cee..86deede60 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala
@@ -52,7 +52,7 @@ object CouchbaseFlow {
/**
* Create a flow to query Couchbase for by `id` and emit documents of the
given class.
*/
- def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def fromId[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
bucketName: String,
target: Class[T]): Flow[String, T, NotUsed] =
Flow
@@ -82,7 +82,7 @@ object CouchbaseFlow {
/**
* Create a flow to update or insert a Couchbase document of the given class.
*/
- def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
Flow
@@ -98,7 +98,7 @@ object CouchbaseFlow {
* Create a flow to update or insert a Couchbase document of the given class
and emit a result so that write failures
* can be handled in-stream.
*/
- def upsertDocWithResult[T <: Document[_]](sessionSettings:
CouchbaseSessionSettings,
+ def upsertDocWithResult[T <: Document[?]](sessionSettings:
CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
@@ -136,7 +136,7 @@ object CouchbaseFlow {
/**
* Create a flow to replace a Couchbase document of the given class.
*/
- def replaceDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def replaceDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
Flow
@@ -152,7 +152,7 @@ object CouchbaseFlow {
* Create a flow to replace a Couchbase document of the given class and emit
a result so that write failures
* can be handled in-stream.
*/
- def replaceDocWithResult[T <: Document[_]](sessionSettings:
CouchbaseSessionSettings,
+ def replaceDocWithResult[T <: Document[?]](sessionSettings:
CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
index 8861a298f..25764beef 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSession.scala
@@ -114,7 +114,7 @@ trait CouchbaseSession {
*
* @return A future that completes with the written document when the write
completes
*/
- def insertDoc[T <: Document[_]](document: T): Future[T]
+ def insertDoc[T <: Document[?]](document: T): Future[T]
/**
* Insert a JSON document using the given write settings.
@@ -129,7 +129,7 @@ trait CouchbaseSession {
*
* @return A future that completes with the written document when the write
completes
*/
- def insertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
+ def insertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
/**
* @return A document if found or none if there is no document for the id
@@ -139,7 +139,7 @@ trait CouchbaseSession {
/**
* @return A document of the given type if found or none if there is no
document for the id
*/
- def get[T <: Document[_]](id: String, documentClass: Class[T]):
Future[Option[T]]
+ def get[T <: Document[?]](id: String, documentClass: Class[T]):
Future[Option[T]]
/**
* @param timeout fail the returned future with a TimeoutException if it
takes longer than this
@@ -150,7 +150,7 @@ trait CouchbaseSession {
/**
* @return A document of the given type if found or none if there is no
document for the id
*/
- def get[T <: Document[_]](id: String, timeout: FiniteDuration,
documentClass: Class[T]): Future[Option[T]]
+ def get[T <: Document[?]](id: String, timeout: FiniteDuration,
documentClass: Class[T]): Future[Option[T]]
/**
* Upsert using the default write settings.
@@ -168,7 +168,7 @@ trait CouchbaseSession {
*
* @return a future that completes when the upsert is done
*/
- def upsertDoc[T <: Document[_]](document: T): Future[T]
+ def upsertDoc[T <: Document[?]](document: T): Future[T]
/**
* Upsert using the given write settings
@@ -186,7 +186,7 @@ trait CouchbaseSession {
*
* @return a future that completes when the upsert is done
*/
- def upsertDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
+ def upsertDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
/**
* Replace using the default write settings.
@@ -204,7 +204,7 @@ trait CouchbaseSession {
*
* @return a future that completes when the replace is done
*/
- def replaceDoc[T <: Document[_]](document: T): Future[T]
+ def replaceDoc[T <: Document[?]](document: T): Future[T]
/**
* Replace using the given write settings
@@ -222,7 +222,7 @@ trait CouchbaseSession {
*
* @return a future that completes when the replace is done
*/
- def replaceDoc[T <: Document[_]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
+ def replaceDoc[T <: Document[?]](document: T, writeSettings:
CouchbaseWriteSettings): Future[T]
/**
* Remove a document by id using the default write settings.
diff --git
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
index fbbcb5722..0cc33e1c0 100644
---
a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
+++
b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseSink.scala
@@ -37,7 +37,7 @@ object CouchbaseSink {
/**
* Create a sink to update or insert a Couchbase document of the given class.
*/
- def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
+ def upsertDoc[T <: Document[?]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Sink[T, Future[Done]] =
CouchbaseFlow
diff --git
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
index 3c0f65008..7ab4dfba7 100644
---
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
+++
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/javadsl/DynamoDb.scala
@@ -75,7 +75,7 @@ object DynamoDb {
*/
def flowPaginated[In <: DynamoDbRequest, Out <: DynamoDbResponse](
client: DynamoDbAsyncClient,
- operation: DynamoDbPaginatedOp[In, Out, _]): Flow[In, Out, NotUsed] =
+ operation: DynamoDbPaginatedOp[In, Out, ?]): Flow[In, Out, NotUsed] =
scaladsl.DynamoDb.flowPaginated()(client, operation).asJava
/**
diff --git
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
index c90943018..73004dd3d 100644
---
a/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
+++
b/dynamodb/src/main/scala/org/apache/pekko/stream/connectors/dynamodb/scaladsl/DynamoDb.scala
@@ -68,7 +68,7 @@ object DynamoDb {
*/
def source[In <: DynamoDbRequest, Out <: DynamoDbResponse](
request: In)(
- implicit client: DynamoDbAsyncClient, operation: DynamoDbPaginatedOp[In,
Out, _]): Source[Out, NotUsed] =
+ implicit client: DynamoDbAsyncClient, operation: DynamoDbPaginatedOp[In,
Out, ?]): Source[Out, NotUsed] =
Source.fromPublisher(operation.publisher(request))
/**
@@ -78,7 +78,7 @@ object DynamoDb {
*/
def flowPaginated[In <: DynamoDbRequest, Out <: DynamoDbResponse]()(
implicit client: DynamoDbAsyncClient,
- operation: DynamoDbPaginatedOp[In, Out, _]): Flow[In, Out, NotUsed] =
Flow[In].flatMapConcat(source(_))
+ operation: DynamoDbPaginatedOp[In, Out, ?]): Flow[In, Out, NotUsed] =
Flow[In].flatMapConcat(source(_))
/**
* Create a Future that will be completed with a response to a given request.
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
index ab61474b6..5be0196ed 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/ReadResult.scala
@@ -35,7 +35,7 @@ final class ReadResult[T] @InternalApi private[elasticsearch]
(val id: String,
s"""ReadResult(id=$id,source=$source,version=${version.getOrElse("")})"""
override def equals(other: Any): Boolean = other match {
- case that: ReadResult[_] =>
+ case that: ReadResult[?] =>
java.util.Objects.equals(this.id, that.id) &&
java.util.Objects.equals(this.source, that.source) &&
java.util.Objects.equals(this.version, that.version)
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
index da25a4dca..dc68a0d9e 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteMessage.scala
@@ -95,7 +95,7 @@ final class WriteMessage[T, PT] private (val operation:
Operation,
s"""WriteMessage(operation=$operation,id=$id,source=$source,passThrough=$passThrough,version=$version,indexName=$indexName,customMetadata=$customMetadata)"""
override def equals(other: Any): Boolean = other match {
- case that: WriteMessage[_, _] =>
+ case that: WriteMessage[?, ?] =>
java.util.Objects.equals(this.operation, that.operation) &&
java.util.Objects.equals(this.id, that.id) &&
java.util.Objects.equals(this.source, that.source) &&
@@ -168,7 +168,7 @@ final class WriteResult[T2, C2] @InternalApi
private[elasticsearch] (val message
s"""WriteResult(message=$message,error=$error)"""
override def equals(other: Any): Boolean = other match {
- case that: WriteResult[_, _] =>
+ case that: WriteResult[?, ?] =>
java.util.Objects.equals(this.message, that.message) &&
java.util.Objects.equals(this.error, that.error)
case _ => false
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
index 18e1186e1..4a7e502b5 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
@@ -35,7 +35,7 @@ import scala.concurrent.{ ExecutionContext, Future }
@InternalApi
private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T])(implicit http: HttpExt, mat: Materializer, ec:
ExecutionContext)
extends GraphStage[
FlowShape[(immutable.Seq[WriteMessage[T, C]],
immutable.Seq[WriteResult[T, C]]),
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
index 4a4222c2d..75056d867 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala
@@ -61,7 +61,7 @@ private[elasticsearch] trait MessageReader[T] {
private[elasticsearch] final class ElasticsearchSourceStage[T](
elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
reader: MessageReader[T])(implicit http: HttpExt, mat: Materializer, ec:
ExecutionContext)
extends GraphStage[SourceShape[ReadResult[T]]] {
@@ -86,7 +86,7 @@ object ElasticsearchSourceStage {
private[elasticsearch] final class ElasticsearchSourceLogic[T](
elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
out: Outlet[ReadResult[T]],
shape: SourceShape[ReadResult[T]],
reader: MessageReader[T])(implicit http: HttpExt, mat: Materializer, ec:
ExecutionContext)
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
index bb7d488db..35aa6656b 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchFlow.scala
@@ -37,7 +37,7 @@ object ElasticsearchFlow {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
objectMapper: ObjectMapper)
: pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T,
NotUsed], NotUsed] =
create(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
@@ -53,7 +53,7 @@ object ElasticsearchFlow {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
messageWriter: MessageWriter[T])
: pekko.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T,
NotUsed], NotUsed] =
scaladsl.ElasticsearchFlow
@@ -72,7 +72,7 @@ object ElasticsearchFlow {
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
objectMapper: ObjectMapper): pekko.stream.javadsl.Flow[WriteMessage[T,
C], WriteResult[T, C], NotUsed] =
createWithPassThrough(elasticsearchParams, settings, new
JacksonWriter[T](objectMapper))
@@ -88,7 +88,7 @@ object ElasticsearchFlow {
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
messageWriter: MessageWriter[T]):
pekko.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
scaladsl.ElasticsearchFlow
.createWithPassThrough(elasticsearchParams, settings, messageWriter)
@@ -107,7 +107,7 @@ object ElasticsearchFlow {
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
objectMapper: ObjectMapper):
pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
java.util.List[WriteResult[T, C]], NotUsed] =
createBulk(elasticsearchParams, settings, new
JacksonWriter[T](objectMapper))
@@ -125,7 +125,7 @@ object ElasticsearchFlow {
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
messageWriter: MessageWriter[T]):
pekko.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]],
java.util.List[WriteResult[T, C]], NotUsed] = pekko.stream.scaladsl
.Flow[java.util.List[WriteMessage[T, C]]]
@@ -148,7 +148,7 @@ object ElasticsearchFlow {
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
objectMapper: ObjectMapper)
: pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C,
WriteResult[T, C], C, NotUsed] =
createWithContext(elasticsearchParams, settings, new
JacksonWriter[T](objectMapper))
@@ -165,7 +165,7 @@ object ElasticsearchFlow {
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
messageWriter: MessageWriter[T])
: pekko.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C,
WriteResult[T, C], C, NotUsed] =
scaladsl.ElasticsearchFlow
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
index 70477ebd7..d92d909de 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSink.scala
@@ -31,7 +31,7 @@ object ElasticsearchSink {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
objectMapper: ObjectMapper): pekko.stream.javadsl.Sink[WriteMessage[T,
NotUsed], CompletionStage[Done]] =
ElasticsearchFlow
.create(elasticsearchParams, settings, objectMapper)
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
index 4a50a4dfc..849dcd0c3 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala
@@ -37,7 +37,7 @@ object ElasticsearchSource {
*/
def create(elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _]):
Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
+ settings: SourceSettingsBase[?, ?]):
Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
create(elasticsearchParams, query, settings, new ObjectMapper())
/**
@@ -46,7 +46,7 @@ object ElasticsearchSource {
*/
def create(elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String,
Object]], NotUsed] =
Source
.fromMaterializer { (mat: Materializer, _: Attributes) =>
@@ -78,7 +78,7 @@ object ElasticsearchSource {
*/
def create(elasticsearchParams: ElasticsearchParams,
searchParams: java.util.Map[String, String],
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String,
Object]], NotUsed] =
Source
.fromMaterializer { (mat: Materializer, _: Attributes) =>
@@ -103,7 +103,7 @@ object ElasticsearchSource {
*/
def typed[T](elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
clazz: Class[T]): Source[ReadResult[T], NotUsed] =
typed[T](elasticsearchParams, query, settings, clazz, new ObjectMapper())
@@ -113,7 +113,7 @@ object ElasticsearchSource {
*/
def typed[T](elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
clazz: Class[T],
objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
Source
@@ -145,7 +145,7 @@ object ElasticsearchSource {
*/
def typed[T](elasticsearchParams: ElasticsearchParams,
searchParams: java.util.Map[String, String],
- settings: SourceSettingsBase[_, _],
+ settings: SourceSettingsBase[?, ?],
clazz: Class[T],
objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
Source
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
index ac40d4595..b7f17bea5 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala
@@ -37,7 +37,7 @@ object ElasticsearchFlow {
*
* This factory method requires an implicit Spray JSON writer for `T`.
*/
- def create[T](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[_, _])(
+ def create[T](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[?, ?])(
implicit sprayJsonWriter: JsonWriter[T]): Flow[WriteMessage[T, NotUsed],
WriteResult[T, NotUsed], NotUsed] =
create[T](elasticsearchParams, settings, new
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -47,7 +47,7 @@ object ElasticsearchFlow {
* successful execution.
*/
def create[T](elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]): Flow[WriteMessage[T, NotUsed], WriteResult[T,
NotUsed], NotUsed] = {
Flow[WriteMessage[T, NotUsed]]
.batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+
wm }
@@ -63,7 +63,7 @@ object ElasticsearchFlow {
*
* This factory method requires an implicit Spray JSON writer for `T`.
*/
- def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _])(
+ def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[?, ?])(
implicit sprayJsonWriter: JsonWriter[T]): Flow[WriteMessage[T, C],
WriteResult[T, C], NotUsed] =
createWithPassThrough[T, C](elasticsearchParams, settings, new
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -74,7 +74,7 @@ object ElasticsearchFlow {
* successful execution.
*/
def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]): Flow[WriteMessage[T, C], WriteResult[T, C],
NotUsed] = {
Flow[WriteMessage[T, C]]
.batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+
wm }
@@ -91,7 +91,7 @@ object ElasticsearchFlow {
*
* This factory method requires an implicit Spray JSON writer for `T`.
*/
- def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[_, _])(
+ def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[?, ?])(
implicit sprayJsonWriter: JsonWriter[T])
: Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T,
C]], NotUsed] =
createBulk[T, C](elasticsearchParams, settings, new
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -105,7 +105,7 @@ object ElasticsearchFlow {
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]],
immutable.Seq[WriteResult[T, C]], NotUsed] = {
stageFlow(elasticsearchParams, settings, writer)
}
@@ -119,7 +119,7 @@ object ElasticsearchFlow {
* This factory method requires an implicit Spray JSON writer for `T`.
*/
@ApiMayChange
- def createWithContext[T, C](elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[_, _])(
+ def createWithContext[T, C](elasticsearchParams: ElasticsearchParams,
settings: WriteSettingsBase[?, ?])(
implicit sprayJsonWriter: JsonWriter[T])
: FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C,
NotUsed] =
createWithContext[T, C](elasticsearchParams, settings, new
SprayJsonWriter[T]()(sprayJsonWriter))
@@ -133,7 +133,7 @@ object ElasticsearchFlow {
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]): FlowWithContext[WriteMessage[T, NotUsed], C,
WriteResult[T, C], C, NotUsed] = {
Flow[WriteMessage[T, C]]
.batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+
wm }
@@ -145,7 +145,7 @@ object ElasticsearchFlow {
@InternalApi
private def stageFlow[T, C](
elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]],
immutable.Seq[WriteResult[T, C]], NotUsed] = {
if (settings.retryLogic == RetryNever) {
val basicFlow = basicStageFlow[T, C](elasticsearchParams, settings,
writer)
@@ -208,7 +208,7 @@ object ElasticsearchFlow {
@InternalApi
private def basicStageFlow[T, C](elasticsearchParams: ElasticsearchParams,
- settings: WriteSettingsBase[_, _],
+ settings: WriteSettingsBase[?, ?],
writer: MessageWriter[T]) = {
Flow
.fromMaterializer { (mat, _) =>
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
index 9e9eff823..b9ff8ef88 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSink.scala
@@ -29,7 +29,7 @@ object ElasticsearchSink {
/**
* Create a sink to update Elasticsearch with
[[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing
type `T`.
*/
- def create[T](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[_, _])(
+ def create[T](elasticsearchParams: ElasticsearchParams, settings:
WriteSettingsBase[?, ?])(
implicit sprayJsonWriter: JsonWriter[T]): Sink[WriteMessage[T, NotUsed],
Future[Done]] =
ElasticsearchFlow.create[T](elasticsearchParams,
settings).toMat(Sink.ignore)(Keep.right)
diff --git
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
index 94e23a22c..317df4d6c 100644
---
a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
+++
b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchSource.scala
@@ -36,7 +36,7 @@ object ElasticsearchSource {
def apply(
elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject],
NotUsed] =
+ settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject],
NotUsed] =
create(elasticsearchParams, query, settings)
/**
@@ -50,7 +50,7 @@ object ElasticsearchSource {
*/
def apply(elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
- settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject],
NotUsed] =
+ settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject],
NotUsed] =
create(elasticsearchParams, searchParams, settings)
/**
@@ -59,7 +59,7 @@ object ElasticsearchSource {
*/
def create(elasticsearchParams: ElasticsearchParams,
query: String,
- settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject],
NotUsed] =
+ settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject],
NotUsed] =
create(elasticsearchParams, Map("query" -> query), settings)
/**
@@ -72,7 +72,7 @@ object ElasticsearchSource {
*/
def create(elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
- settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject],
NotUsed] =
+ settings: SourceSettingsBase[?, ?]): Source[ReadResult[JsObject],
NotUsed] =
Source
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
@@ -93,7 +93,7 @@ object ElasticsearchSource {
* Creates a [[pekko.stream.scaladsl.Source]] from Elasticsearch that
streams [[ReadResult]]s of type `T`
* converted by Spray's [[spray.json.JsonReader]]
*/
- def typed[T](elasticsearchParams: ElasticsearchParams, query: String,
settings: SourceSettingsBase[_, _])(
+ def typed[T](elasticsearchParams: ElasticsearchParams, query: String,
settings: SourceSettingsBase[?, ?])(
implicit sprayJsonReader: JsonReader[T]): Source[ReadResult[T], NotUsed]
=
typed(elasticsearchParams, Map("query" -> query), settings)
@@ -107,7 +107,7 @@ object ElasticsearchSource {
*/
def typed[T](elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
- settings: SourceSettingsBase[_, _])(
+ settings: SourceSettingsBase[?, ?])(
implicit sprayJsonReader: JsonReader[T]): Source[ReadResult[T], NotUsed]
=
Source
.fromMaterializer { (mat, _) =>
diff --git
a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
index 2c67de274..050b2ca17 100644
--- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
+++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala
@@ -69,7 +69,7 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with
ScalaFutures =>
}
def readTitlesFrom(apiVersion: ApiVersionBase,
- sourceSettings: SourceSettingsBase[_, _],
+ sourceSettings: SourceSettingsBase[?, ?],
indexName: String): Future[immutable.Seq[String]] =
ElasticsearchSource
.typed[Book](
diff --git
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
index b008abe50..5484fe36f 100644
---
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
+++
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarArchiveManager.scala
@@ -25,7 +25,7 @@ import pekko.util.ByteString
*/
@InternalApi private[file] object TarArchiveManager {
- def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString,
NotUsed] = {
+ def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, ?]), ByteString,
NotUsed] = {
Flow[(TarArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
diff --git
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
index d10e14a63..5cce29ffc 100644
---
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
+++
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/scaladsl/Archive.scala
@@ -52,7 +52,7 @@ object Archive {
/**
* Flow for packaging multiple files into one TAR file.
*/
- def tar(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString,
NotUsed] =
+ def tar(): Flow[(TarArchiveMetadata, Source[ByteString, ?]), ByteString,
NotUsed] =
TarArchiveManager.tarFlow()
/**
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
index 216d7a597..e4e3211d0 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpLike.scala
@@ -54,7 +54,7 @@ protected[ftp] trait FtpLike[FtpClient, S <:
RemoteFileSettings] {
* INTERNAL API
*/
@InternalApi
-protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>
+protected[ftp] trait RetrieveOffset { self: FtpLike[?, ?] =>
def retrieveFileInputStream(name: String, handler: Handler, offset: Long):
Try[InputStream]
@@ -64,7 +64,7 @@ protected[ftp] trait RetrieveOffset { self: FtpLike[_, _] =>
* INTERNAL API
*/
@InternalApi
-protected[ftp] trait UnconfirmedReads { self: FtpLike[_, _] =>
+protected[ftp] trait UnconfirmedReads { self: FtpLike[?, ?] =>
def retrieveFileInputStream(name: String, handler: Handler, offset: Long,
maxUnconfirmedReads: Int): Try[InputStream]
diff --git
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
index bc072e325..e8b75160d 100644
---
a/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
+++
b/ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/SftpOperations.scala
@@ -60,9 +60,9 @@ private[ftp] trait SftpOperations { self: FtpLike[SSHClient,
SftpSettings] =>
if (credentials.password != "") {
val passwordAuth: AuthPassword = new AuthPassword(new
PasswordFinder() {
- def reqPassword(resource: Resource[_]): Array[Char] =
credentials.password.toCharArray
+ def reqPassword(resource: Resource[?]): Array[Char] =
credentials.password.toCharArray
- def shouldRetry(resource: Resource[_]) = false
+ def shouldRetry(resource: Resource[?]) = false
})
ssh.auth(credentials.username, passwordAuth, keyAuth)
diff --git
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
index acadca341..360a7884d 100644
---
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
+++
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/GeodeSettings.scala
@@ -21,13 +21,13 @@ import org.apache.geode.cache.client.ClientCacheFactory
final class GeodeSettings private (val hostname: String,
val port: Int = 10334,
val configure: Option[ClientCacheFactory => ClientCacheFactory] = None,
- val pdxCompat: (Class[_], Class[_]) => Boolean = (c1, c2) =>
+ val pdxCompat: (Class[?], Class[?]) => Boolean = (c1, c2) =>
c1.getSimpleName.equals(c2.getSimpleName)) {
private def copy(hostname: String = hostname,
port: Int = port,
configure: Option[ClientCacheFactory => ClientCacheFactory] = configure,
- pdxCompat: (Class[_], Class[_]) => Boolean = pdxCompat) =
+ pdxCompat: (Class[?], Class[?]) => Boolean = pdxCompat) =
new GeodeSettings(hostname, port, configure, pdxCompat)
/**
@@ -39,7 +39,7 @@ final class GeodeSettings private (val hostname: String,
/**
* @param pdxCompat a function that determines if two class are equivalent
(java class / scala case class)
*/
- def withPdxCompat(pdxCompat: (Class[_], Class[_]) => Boolean): GeodeSettings
= copy(pdxCompat = pdxCompat)
+ def withPdxCompat(pdxCompat: (Class[?], Class[?]) => Boolean): GeodeSettings
= copy(pdxCompat = pdxCompat)
override def toString: String =
"GeodeSettings(" +
diff --git
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
index e887a7c6b..c338bb51e 100644
---
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
+++
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/DelegatingPdxSerializer.scala
@@ -26,10 +26,10 @@ import org.apache.geode.pdx.{ PdxReader, PdxSerializer,
PdxWriter }
*/
@InternalApi
private[geode] class DelegatingPdxSerializer(
- isPdxCompat: (Class[_], Class[_]) => Boolean) extends PdxSerializer
+ isPdxCompat: (Class[?], Class[?]) => Boolean) extends PdxSerializer
with Declarable {
- private var serializers = Map[Class[_], PdxSerializer]()
+ private var serializers = Map[Class[?], PdxSerializer]()
def register[V](serializer: PdxSerializer, clazz: Class[V]): Unit =
synchronized {
if (!serializers.contains(clazz))
@@ -56,7 +56,7 @@ private[geode] class DelegatingPdxSerializer(
*
* @return unmarshalled class or null
*/
- override def fromData(clazz: Class[_], in: PdxReader): AnyRef =
+ override def fromData(clazz: Class[?], in: PdxReader): AnyRef =
serializers
.get(clazz)
.map(_.fromData(clazz, in))
diff --git
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
index 0d3b822a3..51fa4a53a 100644
---
a/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
+++
b/geode/src/main/scala/org/apache/pekko/stream/connectors/geode/impl/pdx/ShapelessPdxSerializer.scala
@@ -32,7 +32,7 @@ private[geode] class ShapelessPdxSerializer[A <: AnyRef](enc:
PdxEncoder[A], dec
tag.runtimeClass.isInstance(o) &&
enc.encode(out, o.asInstanceOf[A])
- override def fromData(clazz: Class[_], in: PdxReader): A =
+ override def fromData(clazz: Class[?], in: PdxReader): A =
dec.decode(in, null) match {
case Success(e) => e
case _ => null.asInstanceOf[A]
diff --git a/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
b/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
index 3db69b969..17f9628b2 100644
--- a/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
+++ b/geode/src/test/scala/docs/scaladsl/PersonPdxSerializer.scala
@@ -32,7 +32,7 @@ object PersonPdxSerializer extends PekkoPdxSerializer[Person]
{
} else
false
- override def fromData(clazz: Class[_], in: PdxReader): AnyRef = {
+ override def fromData(clazz: Class[?], in: PdxReader): AnyRef = {
val id: Int = in.readInt("id")
val name: String = in.readString("name")
val birthDate: Date = in.readDate("birthDate")
diff --git
a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
index f1ced54e3..01e6c42e6 100644
---
a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
+++
b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/scaladsl/schema/ProductSchemas.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
*/
trait ProductSchemas extends ProductSchemasInstances { this: StandardSchemas =>
- protected def extractFieldNames(tag: ClassTag[_]): Array[String] =
+ protected def extractFieldNames(tag: ClassTag[?]): Array[String] =
ProductSchemasSupport.extractFieldNames(tag)
}
@@ -43,5 +43,5 @@ private[schema] final class ProductSchemaWriter[T <:
Product](fieldSchemas: Seq[
}
private object ProductSchemasSupport extends StandardFormats with
ProductFormats with AdditionalFormats {
- override def extractFieldNames(tag: ClassTag[_]): Array[String] =
super.extractFieldNames(tag)
+ override def extractFieldNames(tag: ClassTag[?]): Array[String] =
super.extractFieldNames(tag)
}
diff --git
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
index 1e717a69f..8c3b6a789 100644
---
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
+++
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
@@ -723,7 +723,7 @@ object GooglePubSub {
}
.mapMaterializedValue(flattenCs(_))
- private def flattenCs[T](f: CompletionStage[_ <: CompletionStage[T]]):
CompletionStage[T] =
+ private def flattenCs[T](f: CompletionStage[? <: CompletionStage[T]]):
CompletionStage[T] =
f.thenCompose((t: CompletionStage[T]) => t)
private def publisher(mat: Materializer, attr: Attributes) =
diff --git
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
index b0329bd37..3f00a5a8a 100644
---
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
+++
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala
@@ -111,7 +111,7 @@ import scala.concurrent.ExecutionContext.parasitic
def putObject(bucket: String,
objectName: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentType: ContentType): Source[StorageObject, NotUsed] = sourceGCS {
settings =>
val uri = Uri(settings.endpointUrl)
.withPath(Path("/upload" + settings.basePath) ++ getBucketPath(bucket) /
"o")
diff --git
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
index 883da4ea2..207a816e2 100644
---
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
+++
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/javadsl/GCStorage.scala
@@ -242,7 +242,7 @@ object GCStorage {
*/
def simpleUpload(bucket: String,
objectName: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentType: ContentType): Source[StorageObject, NotUsed] =
GCStorageStream.putObject(bucket, objectName, data.asScala,
contentType.asInstanceOf[ScalaContentType]).asJava
diff --git
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
index 080d7ea24..022842e2d 100644
---
a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
+++
b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorage.scala
@@ -216,7 +216,7 @@ object GCStorage {
*/
def simpleUpload(bucket: String,
objectName: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentType: ContentType): Source[StorageObject, NotUsed] =
GCStorageStream.putObject(bucket, objectName, data, contentType)
diff --git
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
index af10b038d..ceecd79e8 100644
---
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
+++
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/scaladsl/GCStorageWiremockBase.scala
@@ -827,7 +827,7 @@ abstract class GCStorageWiremockBase(_system: ActorSystem,
_wireMockServer: Hove
object GCStorageWiremockBase {
- def getCallerName(clazz: Class[_]): String = {
+ def getCallerName(clazz: Class[?]): String = {
val s = Thread.currentThread.getStackTrace.map(_.getClassName).drop(1)
.dropWhile(_.matches("(java.lang.Thread|.*WireMockBase.?$)"))
val reduced = s.lastIndexWhere(_ == clazz.getName) match {
diff --git
a/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
b/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
index 71b1139ed..11f07bfb1 100644
---
a/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
+++
b/hdfs/src/main/scala/org/apache/pekko/stream/connectors/hdfs/scaladsl/HdfsFlow.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.{ SequenceFile, Writable }
object HdfsFlow {
- private[hdfs] val OnlyRotationMessage: PartialFunction[OutgoingMessage[_],
RotationMessage] = {
+ private[hdfs] val OnlyRotationMessage: PartialFunction[OutgoingMessage[?],
RotationMessage] = {
case m: RotationMessage => m
}
diff --git
a/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
b/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
index d10e7a9e7..a64a8ed31 100644
---
a/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
+++
b/influxdb/src/main/scala/org/apache/pekko/stream/connectors/influxdb/impl/PekkoConnectorsResultMapperHelper.scala
@@ -47,10 +47,10 @@ private[impl] class PekkoConnectorsResultMapperHelper {
.appendZoneOrOffsetId
.toFormatter
- private[impl] def databaseName(point: Class[_]): String =
+ private[impl] def databaseName(point: Class[?]): String =
point.getAnnotation(classOf[Measurement]).database();
- private[impl] def retentionPolicy(point: Class[_]): String =
+ private[impl] def retentionPolicy(point: Class[?]): String =
point.getAnnotation(classOf[Measurement]).retentionPolicy();
private[impl] def convertModelToPoint[T](model: T): Point = {
@@ -70,7 +70,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
val field = colNameAndFieldMap.get(key)
val column = field.getAnnotation(classOf[Column])
val columnName: String = column.name()
- val fieldType: Class[_] = field.getType()
+ val fieldType: Class[?] = field.getType()
val isAccessible = field.isAccessible() // deprecated in JDK 11+
if (!isAccessible) {
@@ -96,7 +96,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
}
}
- private[impl] def cacheClassFields(clazz: Class[_]) =
+ private[impl] def cacheClassFields(clazz: Class[?]) =
if (!CLASS_FIELD_CACHE.containsKey(clazz.getName)) {
val initialMap: ConcurrentMap[String, Field] = new ConcurrentHashMap()
var influxColumnAndFieldMap =
CLASS_FIELD_CACHE.putIfAbsent(clazz.getName, initialMap)
@@ -125,20 +125,20 @@ private[impl] class PekkoConnectorsResultMapperHelper {
.toList
}
- private def measurementName(point: Class[_]): String =
+ private def measurementName(point: Class[?]): String =
point.getAnnotation(classOf[Measurement]).name();
- private def timeUnit(point: Class[_]): TimeUnit =
+ private def timeUnit(point: Class[?]): TimeUnit =
point.getAnnotation(classOf[Measurement]).timeUnit()
- private def setTime(pointBuilder: Point.Builder, fieldType: Class[_],
timeUnit: TimeUnit, value: Any): Unit =
+ private def setTime(pointBuilder: Point.Builder, fieldType: Class[?],
timeUnit: TimeUnit, value: Any): Unit =
if (classOf[Instant].isAssignableFrom(fieldType)) {
val instant = value.asInstanceOf[Instant]
val time = timeUnit.convert(instant.toEpochMilli, TimeUnit.MILLISECONDS)
pointBuilder.time(time, timeUnit)
} else throw new InfluxDBMapperException("Unsupported type " + fieldType +
" for time: should be of Instant type")
- private def setField(pointBuilder: Point.Builder, fieldType: Class[_],
columnName: String, value: Any): Unit =
+ private def setField(pointBuilder: Point.Builder, fieldType: Class[?],
columnName: String, value: Any): Unit =
if (classOf[java.lang.Boolean].isAssignableFrom(fieldType) ||
classOf[Boolean].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[Boolean])
else if (classOf[java.lang.Long].isAssignableFrom(fieldType) ||
classOf[Long].isAssignableFrom(fieldType))
@@ -150,7 +150,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
else if (classOf[String].isAssignableFrom(fieldType))
pointBuilder.addField(columnName, value.asInstanceOf[String])
else throw new InfluxDBMapperException("Unsupported type " + fieldType + "
for column " + columnName)
- private def throwExceptionIfMissingAnnotation(clazz: Class[_]): Unit =
+ private def throwExceptionIfMissingAnnotation(clazz: Class[?]): Unit =
if (!clazz.isAnnotationPresent(classOf[Measurement]))
throw new IllegalArgumentException(
"Class " + clazz.getName + " is not annotated with @" +
classOf[Measurement].getSimpleName)
@@ -202,7 +202,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
@throws[IllegalArgumentException]
@throws[IllegalAccessException]
- private def fieldValueForPrimitivesModified[T](fieldType: Class[_], field:
Field, obj: T, value: Any): Boolean =
+ private def fieldValueForPrimitivesModified[T](fieldType: Class[?], field:
Field, obj: T, value: Any): Boolean =
if (classOf[Double].isAssignableFrom(fieldType)) {
field.setDouble(obj, value.asInstanceOf[Double].doubleValue)
true
@@ -221,7 +221,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
@throws[IllegalArgumentException]
@throws[IllegalAccessException]
- private def fieldValueForPrimitiveWrappersModified[T](fieldType: Class[_],
+ private def fieldValueForPrimitiveWrappersModified[T](fieldType: Class[?],
field: Field,
obj: T,
value: Any): Boolean =
@@ -243,7 +243,7 @@ private[impl] class PekkoConnectorsResultMapperHelper {
@throws[IllegalArgumentException]
@throws[IllegalAccessException]
- private def fieldValueModified[T](fieldType: Class[_],
+ private def fieldValueModified[T](fieldType: Class[?],
field: Field,
obj: T,
value: Any,
diff --git
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
index e2d07e8f0..5f7cd974d 100644
---
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
+++
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqClient.scala
@@ -62,7 +62,7 @@ private[ironmq] final class IronMqClient(settings:
IronMqSettings)(implicit acto
}
}
- private val pipeline: Flow[HttpRequest, HttpResponse, _] = Flow[HttpRequest]
+ private val pipeline: Flow[HttpRequest, HttpResponse, ?] = Flow[HttpRequest]
.map(_.withHeaders(Authorization(GenericHttpCredentials("OAuth",
settings.token))))
.map(_ -> NotUsed)
.via(connectionPoolFlow)
diff --git
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
index 06543b6ba..dab27b8e1 100644
---
a/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
+++
b/ironmq/src/main/scala/org/apache/pekko/stream/connectors/ironmq/impl/IronMqPushStage.scala
@@ -46,7 +46,7 @@ private[ironmq] class IronMqPushStage(queueName: String,
settings: IronMqSetting
implicit def ec: ExecutionContext = materializer.executionContext
- override protected val logSource: Class[_] = classOf[IronMqPushStage]
+ override protected val logSource: Class[?] = classOf[IronMqPushStage]
private var runningFutures: Int = 0
private var exceptionFromUpstream: Option[Throwable] = None
diff --git
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
index 785addfa7..414b9d367 100644
---
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
+++
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/JmsExceptions.scala
@@ -24,13 +24,13 @@ import scala.util.control.NoStackTrace
*/
trait NonRetriableJmsException extends Exception
-case class UnsupportedMessagePropertyType(propertyName: String, propertyValue:
Any, message: JmsEnvelope[_])
+case class UnsupportedMessagePropertyType(propertyName: String, propertyValue:
Any, message: JmsEnvelope[?])
extends Exception(
s"Jms property '$propertyName' has unknown type
'${propertyValue.getClass.getName}'. " +
"Only primitive types and String are supported as property values.")
with NonRetriableJmsException
-case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any,
message: JmsMapMessagePassThrough[_])
+case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any,
message: JmsMapMessagePassThrough[?])
extends Exception(
s"Jms MapMessage entry '$entryName' has unknown type
'${entryValue.getClass.getName}'. " +
"Only primitive types, String, and Byte array are supported as entry
values.")
diff --git
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
index 8dd2d5688..079780040 100644
---
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
+++
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageProducer.scala
@@ -28,7 +28,7 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
private val destinationCache = new SoftReferenceCache[Destination,
jms.Destination]()
- def send(elem: JmsEnvelope[_]): Unit = {
+ def send(elem: JmsEnvelope[?]): Unit = {
val message: jms.Message = createMessage(elem)
populateMessageProperties(message, elem)
@@ -56,34 +56,34 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
private def lookup(dest: Destination) = destinationCache.lookup(dest,
dest.create(jmsSession.session))
- private[jakartams] def createMessage(element: JmsEnvelope[_]): jms.Message =
+ private[jakartams] def createMessage(element: JmsEnvelope[?]): jms.Message =
element match {
- case textMessage: JmsTextMessagePassThrough[_] =>
jmsSession.session.createTextMessage(textMessage.body)
+ case textMessage: JmsTextMessagePassThrough[?] =>
jmsSession.session.createTextMessage(textMessage.body)
- case byteMessage: JmsByteMessagePassThrough[_] =>
+ case byteMessage: JmsByteMessagePassThrough[?] =>
val newMessage = jmsSession.session.createBytesMessage()
newMessage.writeBytes(byteMessage.bytes)
newMessage
- case byteStringMessage: JmsByteStringMessagePassThrough[_] =>
+ case byteStringMessage: JmsByteStringMessagePassThrough[?] =>
val newMessage = jmsSession.session.createBytesMessage()
newMessage.writeBytes(byteStringMessage.bytes.toArray)
newMessage
- case mapMessage: JmsMapMessagePassThrough[_] =>
+ case mapMessage: JmsMapMessagePassThrough[?] =>
val newMessage = jmsSession.session.createMapMessage()
populateMapMessage(newMessage, mapMessage)
newMessage
- case objectMessage: JmsObjectMessagePassThrough[_] =>
+ case objectMessage: JmsObjectMessagePassThrough[?] =>
jmsSession.session.createObjectMessage(objectMessage.serializable)
- case pt: JmsPassThrough[_] => throw new IllegalArgumentException("can't
create message for JmsPassThrough")
+ case pt: JmsPassThrough[?] => throw new IllegalArgumentException("can't
create message for JmsPassThrough")
}
- private[jakartams] def populateMessageProperties(message:
jakarta.jms.Message, jmsMessage: JmsEnvelope[_]): Unit =
+ private[jakartams] def populateMessageProperties(message:
jakarta.jms.Message, jmsMessage: JmsEnvelope[?]): Unit =
jmsMessage.properties.foreach {
case (key, v) =>
v match {
@@ -101,7 +101,7 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
}
}
- private def populateMapMessage(message: jakarta.jms.MapMessage, jmsMessage:
JmsMapMessagePassThrough[_]): Unit =
+ private def populateMapMessage(message: jakarta.jms.MapMessage, jmsMessage:
JmsMapMessagePassThrough[?]): Unit =
jmsMessage.body.foreach {
case (key, v) =>
v match {
diff --git
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
index 675919d5b..2254a1c5d 100644
---
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
+++
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsMessageReader.scala
@@ -51,7 +51,7 @@ private[jakartams] object JmsMessageReader {
def readArray(message: jms.BytesMessage, bufferSize: Int = 4096):
Array[Byte] =
readBytes(message, bufferSize).toArray
- private def createMap(keys: java.util.Enumeration[_], accessor: String =>
AnyRef) =
+ private def createMap(keys: java.util.Enumeration[?], accessor: String =>
AnyRef) =
keys
.asInstanceOf[java.util.Enumeration[String]]
.asScala
diff --git
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
index a7b38643d..6719bc70a 100644
---
a/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
+++
b/jakartams/src/main/scala/org/apache/pekko/stream/connectors/jakartams/impl/JmsProducerStage.scala
@@ -151,12 +151,12 @@ private[jakartams] final class JmsProducerStage[E <:
JmsEnvelope[PassThrough], P
override def onPush(): Unit = {
val elem: E = grab(in)
elem match {
- case _: JmsPassThrough[_] =>
+ case _: JmsPassThrough[?] =>
val holder = new Holder[E](NotYetThere)
inFlightMessages.enqueue(holder)
holder(Success(elem))
pushNextIfPossible()
- case m: JmsEnvelope[_] =>
+ case m: JmsEnvelope[?] =>
// create a holder object to capture the in-flight message,
and enqueue it to preserve message order
val holder = new Holder[E](NotYetThere)
inFlightMessages.enqueue(holder)
@@ -293,7 +293,7 @@ private[jakartams] object JmsProducerStage {
override def apply(t: Try[A]): Unit = elem = t
}
- case class SendAttempt[E <: JmsEnvelope[_]](envelope: E,
+ case class SendAttempt[E <: JmsEnvelope[?]](envelope: E,
holder: Holder[E],
attempt: Int = 0,
backoffMaxed: Boolean = false)
diff --git
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
index 320830e71..ebd8e65aa 100644
---
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
+++
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/JmsExceptions.scala
@@ -24,13 +24,13 @@ import scala.util.control.NoStackTrace
*/
trait NonRetriableJmsException extends Exception
-case class UnsupportedMessagePropertyType(propertyName: String, propertyValue:
Any, message: JmsEnvelope[_])
+case class UnsupportedMessagePropertyType(propertyName: String, propertyValue:
Any, message: JmsEnvelope[?])
extends Exception(
s"Jms property '$propertyName' has unknown type
'${propertyValue.getClass.getName}'. " +
"Only primitive types and String are supported as property values.")
with NonRetriableJmsException
-case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any,
message: JmsMapMessagePassThrough[_])
+case class UnsupportedMapMessageEntryType(entryName: String, entryValue: Any,
message: JmsMapMessagePassThrough[?])
extends Exception(
s"Jms MapMessage entry '$entryName' has unknown type
'${entryValue.getClass.getName}'. " +
"Only primitive types, String, and Byte array are supported as entry
values.")
diff --git
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
index f427de436..e4247cabb 100644
---
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
+++
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageProducer.scala
@@ -28,7 +28,7 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
private val destinationCache = new SoftReferenceCache[Destination,
jms.Destination]()
- def send(elem: JmsEnvelope[_]): Unit = {
+ def send(elem: JmsEnvelope[?]): Unit = {
val message: jms.Message = createMessage(elem)
populateMessageProperties(message, elem)
@@ -56,34 +56,34 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
private def lookup(dest: Destination) = destinationCache.lookup(dest,
dest.create(jmsSession.session))
- private[jms] def createMessage(element: JmsEnvelope[_]): jms.Message =
+ private[jms] def createMessage(element: JmsEnvelope[?]): jms.Message =
element match {
- case textMessage: JmsTextMessagePassThrough[_] =>
jmsSession.session.createTextMessage(textMessage.body)
+ case textMessage: JmsTextMessagePassThrough[?] =>
jmsSession.session.createTextMessage(textMessage.body)
- case byteMessage: JmsByteMessagePassThrough[_] =>
+ case byteMessage: JmsByteMessagePassThrough[?] =>
val newMessage = jmsSession.session.createBytesMessage()
newMessage.writeBytes(byteMessage.bytes)
newMessage
- case byteStringMessage: JmsByteStringMessagePassThrough[_] =>
+ case byteStringMessage: JmsByteStringMessagePassThrough[?] =>
val newMessage = jmsSession.session.createBytesMessage()
newMessage.writeBytes(byteStringMessage.bytes.toArray)
newMessage
- case mapMessage: JmsMapMessagePassThrough[_] =>
+ case mapMessage: JmsMapMessagePassThrough[?] =>
val newMessage = jmsSession.session.createMapMessage()
populateMapMessage(newMessage, mapMessage)
newMessage
- case objectMessage: JmsObjectMessagePassThrough[_] =>
+ case objectMessage: JmsObjectMessagePassThrough[?] =>
jmsSession.session.createObjectMessage(objectMessage.serializable)
- case pt: JmsPassThrough[_] => throw new IllegalArgumentException("can't
create message for JmsPassThrough")
+ case pt: JmsPassThrough[?] => throw new IllegalArgumentException("can't
create message for JmsPassThrough")
}
- private[jms] def populateMessageProperties(message: javax.jms.Message,
jmsMessage: JmsEnvelope[_]): Unit =
+ private[jms] def populateMessageProperties(message: javax.jms.Message,
jmsMessage: JmsEnvelope[?]): Unit =
jmsMessage.properties.foreach {
case (key, v) =>
v match {
@@ -101,7 +101,7 @@ private class JmsMessageProducer(jmsProducer:
jms.MessageProducer, jmsSession: J
}
}
- private def populateMapMessage(message: javax.jms.MapMessage, jmsMessage:
JmsMapMessagePassThrough[_]): Unit =
+ private def populateMapMessage(message: javax.jms.MapMessage, jmsMessage:
JmsMapMessagePassThrough[?]): Unit =
jmsMessage.body.foreach {
case (key, v) =>
v match {
diff --git
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
index e0890c9ec..3c9b9c2e2 100644
---
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
+++
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsMessageReader.scala
@@ -52,7 +52,7 @@ private[jms] object JmsMessageReader {
def readArray(message: jms.BytesMessage, bufferSize: Int = 4096):
Array[Byte] =
readBytes(message, bufferSize).toArray
- private def createMap(keys: java.util.Enumeration[_], accessor: String =>
AnyRef) =
+ private def createMap(keys: java.util.Enumeration[?], accessor: String =>
AnyRef) =
keys
.asInstanceOf[java.util.Enumeration[String]]
.asScala
diff --git
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
index aae31aa04..1bf8c61d6 100644
---
a/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
+++
b/jms/src/main/scala/org/apache/pekko/stream/connectors/jms/impl/JmsProducerStage.scala
@@ -150,12 +150,12 @@ private[jms] final class JmsProducerStage[E <:
JmsEnvelope[PassThrough], PassThr
override def onPush(): Unit = {
val elem: E = grab(in)
elem match {
- case _: JmsPassThrough[_] =>
+ case _: JmsPassThrough[?] =>
val holder = new Holder[E](NotYetThere)
inFlightMessages.enqueue(holder)
holder(Success(elem))
pushNextIfPossible()
- case m: JmsEnvelope[_] =>
+ case m: JmsEnvelope[?] =>
// create a holder object to capture the in-flight message,
and enqueue it to preserve message order
val holder = new Holder[E](NotYetThere)
inFlightMessages.enqueue(holder)
@@ -292,7 +292,7 @@ private[jms] object JmsProducerStage {
override def apply(t: Try[A]): Unit = elem = t
}
- case class SendAttempt[E <: JmsEnvelope[_]](envelope: E,
+ case class SendAttempt[E <: JmsEnvelope[?]](envelope: E,
holder: Holder[E],
attempt: Int = 0,
backoffMaxed: Boolean = false)
diff --git a/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
b/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
index efc2f92e2..d12d1f79f 100644
--- a/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
+++ b/json-streaming/src/test/scala/docs/scaladsl/JsonReaderTest.scala
@@ -35,7 +35,7 @@ class JsonReaderTest extends AnyWordSpec with Matchers with
BeforeAndAfterAll wi
val timeout: FiniteDuration = 3.seconds
// Runs the stream to a sequence sink and returns that sequence
- def collect[A](source: Source[A, _]): Seq[A] =
Await.result(source.runWith(Sink.seq), timeout)
+ def collect[A](source: Source[A, ?]): Seq[A] =
Await.result(source.runWith(Sink.seq), timeout)
// Basic documents + elements to use in tests
val expectedElements: Seq[String] = Seq("""{"name":"test1"}""",
"""{"name":"test2"}""", """{"name":"test3"}""")
diff --git
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
index 3ea052a69..3e0022905 100644
---
a/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
+++
b/kinesis/src/main/scala/org/apache/pekko/stream/connectors/kinesis/impl/KinesisSchedulerSourceStage.scala
@@ -39,7 +39,7 @@ private[kinesis] object KinesisSchedulerSourceStage {
final case class NewRecord(cr: CommittableRecord) extends Command
case object Pump extends Command
case object Complete extends Command
- final case class SchedulerShutdown(result: Try[_]) extends Command
+ final case class SchedulerShutdown(result: Try[?]) extends Command
}
diff --git
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
index 95572de9c..5f462413b 100644
---
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
+++
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/MqttPerf.scala
@@ -87,7 +87,7 @@ class MqttPerf {
Mqtt
.serverSessionFlow(serverSession,
ByteString(connection.remoteAddress.getAddress.getAddress))
.join(connection.flow))
- .wireTap(Sink.foreach[Either[DecodeError, streaming.Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, streaming.Event[?]]] {
case Right(streaming.Event(_: streaming.Connect, _)) =>
server.offer(streaming.Command(connAck))
case Right(streaming.Event(s: streaming.Subscribe, _)) =>
diff --git
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
index 69599e5bd..a6b5914cf 100644
---
a/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
+++
b/mqtt-streaming-bench/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/MqttPerf.scala
@@ -90,7 +90,7 @@ class MqttPerf {
Mqtt
.serverSessionFlow(serverSession,
ByteString(connection.remoteAddress.getAddress.getAddress))
.join(connection.flow))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(_: Connect, _)) =>
server.offer(Command(connAck))
case Right(Event(s: Subscribe, _)) =>
@@ -117,7 +117,7 @@ class MqttPerf {
Mqtt
.clientSessionFlow(clientSession, ByteString("1"))
.join(Tcp().outgoingConnection(host, port)))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(_: SubAck, _)) =>
subscribed.success(Done)
case Right(Event(p: Publish, _)) =>
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
index 6e819657e..2f977151f 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala
@@ -36,7 +36,7 @@ import scala.util.{ Either, Failure, Success }
*/
@InternalApi private[streaming] object ClientConnector {
- type ConnectData = Option[_]
+ type ConnectData = Option[?]
/*
* No ACK received - the CONNECT failed
@@ -597,7 +597,7 @@ import scala.util.{ Either, Failure, Success }
*/
@InternalApi private[streaming] object Subscriber {
- type SubscribeData = Option[_]
+ type SubscribeData = Option[?]
/*
* No ACK received - the subscription failed
@@ -685,7 +685,7 @@ import scala.util.{ Either, Failure, Success }
*/
case object UnsubscribeFailed extends Exception with NoStackTrace
- type UnsubscribeData = Option[_]
+ type UnsubscribeData = Option[?]
/*
* Construct with the starting state
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
index 67a7eb7df..4eefd4028 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala
@@ -54,7 +54,7 @@ import scala.util.{ Either, Failure, Success }
*/
@InternalApi private[streaming] object Producer {
- type PublishData = Option[_]
+ type PublishData = Option[?]
/*
* Construct with the starting state
@@ -383,7 +383,7 @@ import scala.util.{ Either, Failure, Success }
sealed abstract class Request[A]
final case class Register[A](registrant: ActorRef[A], reply:
Promise[Registered]) extends Request[A]
private final case class Unregister[A](packetId: PacketId) extends Request[A]
- final case class Route[A](packetId: PacketId, event: A, failureReply:
Promise[_]) extends Request[A]
+ final case class Route[A](packetId: PacketId, event: A, failureReply:
Promise[?]) extends Request[A]
// Replies
@@ -422,7 +422,7 @@ import scala.util.{ Either, Failure, Success }
step(PacketId(after.underlying + 1))
}
- private[streaming] case class Registration[A](registrant: ActorRef[A],
failureReplies: Seq[Promise[_]])
+ private[streaming] case class Registration[A](registrant: ActorRef[A],
failureReplies: Seq[Promise[?]])
}
/*
@@ -529,12 +529,12 @@ import scala.util.{ Either, Failure, Success }
final case class RegisterConnection[A](connectionId: ByteString, clientId:
String) extends Request[A]
private final case class Unregister[A](clientId: Option[String], packetId:
PacketId) extends Request[A]
final case class UnregisterConnection[A](connectionId: ByteString) extends
Request[A]
- final case class Route[A](clientId: Option[String], packetId: PacketId,
event: A, failureReply: Promise[_])
+ final case class Route[A](clientId: Option[String], packetId: PacketId,
event: A, failureReply: Promise[?])
extends Request[A]
final case class RouteViaConnection[A](connectionId: ByteString,
packetId: PacketId,
event: A,
- failureReply: Promise[_])
+ failureReply: Promise[?])
extends Request[A]
// Replies
@@ -548,7 +548,7 @@ import scala.util.{ Either, Failure, Success }
def apply[A]: Behavior[Request[A]] =
new RemotePacketRouter[A].main(Map.empty, Map.empty)
- private[streaming] case class Registration[A](registrant: ActorRef[A],
failureReplies: Seq[Promise[_]])
+ private[streaming] case class Registration[A](registrant: ActorRef[A],
failureReplies: Seq[Promise[?]])
}
/*
diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
index 0a6385af7..5af82320b 100644
--- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
+++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttSessionSpec.scala
@@ -1358,7 +1358,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connectReceived.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
@@ -1457,7 +1457,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connectReceived.success(Done)
case Right(Event(cp: Subscribe, _))
@@ -1535,7 +1535,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connectReceived.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
@@ -1616,7 +1616,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) => connectReceived.success(Done)
case _ =>
})
@@ -1677,7 +1677,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) => connectReceived.success(Done)
case _ =>
})
@@ -1729,7 +1729,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) => connectReceived.success(Done)
case Right(Event(`disconnect`, _)) =>
disconnectReceived.success(Done)
case _ =>
@@ -1797,7 +1797,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, connectionId)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) if
!firstConnectReceived.isCompleted =>
firstConnectReceived.success(Done)
case Right(Event(`connect`, _)) =>
@@ -1919,7 +1919,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(session, ByteString.empty)
.join(pipeToClient))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connectReceived.success(Done)
case Right(Event(cp: Publish, _)) if
cp.flags.contains(ControlPacketFlags.DUP) =>
@@ -2012,7 +2012,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(serverSession, ByteString("connection 1"))
.join(pipeToClient1))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connect1Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
@@ -2075,7 +2075,7 @@ class MqttSessionSpec
Mqtt
.serverSessionFlow(serverSession, ByteString("connection 2"))
.join(pipeToClient2))
- .wireTap(Sink.foreach[Either[DecodeError, Event[_]]] {
+ .wireTap(Sink.foreach[Either[DecodeError, Event[?]]] {
case Right(Event(`connect`, _)) =>
connect2Received.success(Done)
case Right(Event(cp: Subscribe, _)) if cp.topicFilters ==
subscribe.topicFilters =>
diff --git a/project/CopyrightHeader.scala b/project/CopyrightHeader.scala
index e92b8c9b8..112c7856e 100644
--- a/project/CopyrightHeader.scala
+++ b/project/CopyrightHeader.scala
@@ -23,7 +23,7 @@ trait CopyrightHeader extends AutoPlugin {
override def trigger: PluginTrigger = allRequirements
- protected def headerMappingSettings: Seq[Def.Setting[_]] =
+ protected def headerMappingSettings: Seq[Def.Setting[?]] =
Seq(Compile, Test).flatMap { config =>
inConfig(config)(
Seq(
@@ -35,10 +35,10 @@ trait CopyrightHeader extends AutoPlugin {
HeaderFileType("template") -> cStyleComment)))
}
- override def projectSettings: Seq[Def.Setting[_]] =
+ override def projectSettings: Seq[Def.Setting[?]] =
Def.settings(headerMappingSettings, additional)
- def additional: Seq[Def.Setting[_]] =
+ def additional: Seq[Def.Setting[?]] =
Def.settings(Compile / compile := {
(Compile / headerCreate).value
(Compile / compile).value
diff --git a/project/CopyrightHeaderForBuild.scala
b/project/CopyrightHeaderForBuild.scala
index d26c4fdc1..54c7d048e 100644
--- a/project/CopyrightHeaderForBuild.scala
+++ b/project/CopyrightHeaderForBuild.scala
@@ -18,7 +18,7 @@ import sbt.{ inConfig, Compile, Def, PluginTrigger, Test, _ }
object CopyrightHeaderForBuild extends CopyrightHeader {
override def trigger: PluginTrigger = noTrigger
- override def projectSettings: Seq[Def.Setting[_]] = {
+ override def projectSettings: Seq[Def.Setting[?]] = {
Seq(Compile, Test).flatMap { config =>
inConfig(config) {
Seq(
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
index e3d489738..5e60d202f 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequests.scala
@@ -241,7 +241,7 @@ import scala.xml.NodeSeq
.withDefaultHeaders(headers)
def uploadRequest(s3Location: S3Location,
- payload: Source[ByteString, _],
+ payload: Source[ByteString, ?],
contentLength: Long,
contentType: ContentType,
s3Headers: Seq[HttpHeader])(
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index ce3fc9cf2..3d44561ef 100644
---
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -654,7 +654,7 @@ import scala.util.{ Failure, Success, Try }
def putObject(s3Location: S3Location,
contentType: ContentType,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentLength: Long,
s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] = {
@@ -914,7 +914,7 @@ import scala.util.{ Failure, Success, Try }
*/
def multipartUploadWithContext[C](
s3Location: S3Location,
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize,
@@ -951,7 +951,7 @@ import scala.util.{ Failure, Success, Try }
s3Location: S3Location,
uploadId: String,
previousParts: immutable.Iterable[Part],
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
s3Headers: S3Headers,
chunkSize: Int = MinChunkSize,
@@ -1218,7 +1218,7 @@ import scala.util.{ Failure, Success, Try }
contentType: ContentType,
s3Headers: S3Headers,
chunkSize: Int,
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
initialUploadState: Option[(String, Int)] = None)(
parallelism: Int): Flow[(ByteString, C), UploadPartResponse, NotUsed] = {
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
index 24dfe8871..5d0506525 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala
@@ -290,7 +290,7 @@ object S3 {
*/
def putObject(bucket: String,
key: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentLength: Long,
contentType: ContentType,
s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] =
@@ -314,7 +314,7 @@ object S3 {
*/
def putObject(bucket: String,
key: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentLength: Long,
contentType: ContentType): Source[ObjectMetadata, NotUsed] =
putObject(bucket, key, data, contentLength, contentType,
S3Headers.empty.withCannedAcl(CannedAcl.Private))
@@ -330,7 +330,7 @@ object S3 {
*/
def putObject(bucket: String,
key: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentLength: Long): Source[ObjectMetadata, NotUsed] =
putObject(bucket, key, data, contentLength,
ContentTypes.APPLICATION_OCTET_STREAM)
@@ -828,7 +828,7 @@ object S3 {
def multipartUploadWithContext[C](
bucket: String,
key: String,
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_],
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?],
contentType: ContentType,
s3Headers: S3Headers): Sink[JPair[ByteString, C],
CompletionStage[MultipartUploadResult]] =
S3Stream
@@ -870,7 +870,7 @@ object S3 {
def multipartUploadWithContext[C](
bucket: String,
key: String,
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_],
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?],
contentType: ContentType): Sink[JPair[ByteString, C],
CompletionStage[MultipartUploadResult]] =
multipartUploadWithContext[C](bucket,
key,
@@ -901,7 +901,7 @@ object S3 {
def multipartUploadWithContext[C](
bucket: String,
key: String,
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_])
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?])
: Sink[JPair[ByteString, C], CompletionStage[MultipartUploadResult]] =
multipartUploadWithContext[C](bucket, key, chunkUploadSink,
ContentTypes.APPLICATION_OCTET_STREAM)
@@ -1000,7 +1000,7 @@ object S3 {
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part],
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_],
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?],
contentType: ContentType,
s3Headers: S3Headers): Sink[JPair[ByteString, C],
CompletionStage[MultipartUploadResult]] = {
S3Stream
@@ -1050,7 +1050,7 @@ object S3 {
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part],
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_],
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?],
contentType: ContentType): Sink[JPair[ByteString, C],
CompletionStage[MultipartUploadResult]] =
resumeMultipartUploadWithContext[C](bucket,
key,
@@ -1088,7 +1088,7 @@ object S3 {
key: String,
uploadId: String,
previousParts: java.lang.Iterable[Part],
- chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
_])
+ chunkUploadSink: Sink[JPair[UploadPartResponse, java.lang.Iterable[C]],
?])
: Sink[JPair[ByteString, C], CompletionStage[MultipartUploadResult]] =
resumeMultipartUploadWithContext[C](bucket,
key,
diff --git
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
index 6e1fcb36b..66ba3b4a9 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3.scala
@@ -188,7 +188,7 @@ object S3 {
*/
def putObject(bucket: String,
key: String,
- data: Source[ByteString, _],
+ data: Source[ByteString, ?],
contentLength: Long,
contentType: ContentType = ContentTypes.`application/octet-stream`,
s3Headers: S3Headers): Source[ObjectMetadata, NotUsed] =
@@ -542,7 +542,7 @@ object S3 {
def multipartUploadWithContext[C](
bucket: String,
key: String,
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
metaHeaders: MetaHeaders = MetaHeaders(Map()),
cannedAcl: CannedAcl = CannedAcl.Private,
@@ -587,7 +587,7 @@ object S3 {
def multipartUploadWithHeadersAndContext[C](
bucket: String,
key: String,
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
@@ -671,7 +671,7 @@ object S3 {
key: String,
uploadId: String,
previousParts: immutable.Iterable[Part],
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
metaHeaders: MetaHeaders = MetaHeaders(Map()),
cannedAcl: CannedAcl = CannedAcl.Private,
@@ -758,7 +758,7 @@ object S3 {
key: String,
uploadId: String,
previousParts: immutable.Iterable[Part],
- chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), _],
+ chunkUploadSink: Sink[(UploadPartResponse, immutable.Iterable[C]), ?],
contentType: ContentType = ContentTypes.`application/octet-stream`,
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4,
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
index 14eaa6752..90e6994b2 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3IntegrationSpec.scala
@@ -1186,7 +1186,7 @@ trait S3IntegrationSpec
}
private def uploadAndAndCheckParts(
- defaultBucket: String, source: Source[ByteString, _], expectedParts:
Int): Assertion = {
+ defaultBucket: String, source: Source[ByteString, ?], expectedParts:
Int): Assertion = {
val metadata =
for {
_ <- source.runWith(
diff --git
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
index e77ff00c5..ccbdfe3a6 100644
---
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
+++
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/scaladsl/S3WireMockBase.scala
@@ -834,7 +834,7 @@ abstract class S3WireMockBase(_system: ActorSystem, val
_wireMockServer: WireMoc
object S3WireMockBase {
- def getCallerName(clazz: Class[_]): String = {
+ def getCallerName(clazz: Class[?]): String = {
val s = Thread.currentThread.getStackTrace.map(_.getClassName).drop(1)
.dropWhile(_.matches("(java.lang.Thread|.*WireMockBase.?$)"))
val reduced = s.lastIndexWhere(_ == clazz.getName) match {
diff --git a/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
b/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
index 9fdf031c7..a2805b51b 100644
--- a/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
+++ b/slick/src/test/scala/docs/scaladsl/SlickWithTryResultSpec.scala
@@ -230,7 +230,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
.runWith(Sink.last)
.futureValue
- inserted mustBe a[Failure[_]]
+ inserted mustBe a[Failure[?]]
getAllUsersFromDb.futureValue mustBe Set(users.head)
}
@@ -316,7 +316,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
.runWith(SlickWithTryResult.sinkTry(insertUser))
.futureValue
- inserted mustBe a[Failure[_]]
+ inserted mustBe a[Failure[?]]
getAllUsersFromDb.futureValue mustBe Set(users.head)
}
@@ -328,7 +328,7 @@ class SlickWithTryResultSpec extends AnyWordSpec
.futureValue
records.length mustBe 41
- inserted mustBe a[Failure[_]]
+ inserted mustBe a[Failure[?]]
getAllUsersFromDb.futureValue mustBe users
}
diff --git
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
index 53ac2f215..95d6e2676 100644
---
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
+++
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
@@ -78,7 +78,7 @@ class CharsetCodingFlowsSpec
// #encoding
val targetFile = Paths.get("target/outdata.txt")
val strings = System.getProperties.asScala.map(p => p._1 + " -> " +
p._2).toList
- val stringSource: Source[String, _] = Source(strings)
+ val stringSource: Source[String, ?] = Source(strings)
val result =
stringSource
.via(TextFlow.encoding(StandardCharsets.US_ASCII))
@@ -93,7 +93,7 @@ class CharsetCodingFlowsSpec
import java.nio.charset.StandardCharsets
val utf16bytes = ByteString("äåûßêëé", StandardCharsets.UTF_16)
- val byteStringSource: Source[ByteString, _] =
+ val byteStringSource: Source[ByteString, ?] =
Source
.single(utf16bytes)
@@ -114,7 +114,7 @@ class CharsetCodingFlowsSpec
val utf16bytes = ByteString("äåûßêëé", StandardCharsets.UTF_16)
val targetFile = Paths.get("target/outdata-transcoding.txt")
- val byteStringSource: Source[ByteString, _] =
+ val byteStringSource: Source[ByteString, ?] =
Source
.single(utf16bytes)
val result: Future[IOResult] =
diff --git
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
index f76048c14..a3b5da2f0 100644
---
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
+++
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
@@ -373,7 +373,7 @@ private[unixdomainsocket] abstract class
UnixDomainSocketImpl(system: ExtendedAc
private val sel = NativeSelectorProvider.getInstance.openSelector
/** Override to customise reported log source */
- protected def logSource: Class[_] = getClass
+ protected def logSource: Class[?] = getClass
private val ioThread =
new Thread(() => nioEventLoop(sel, Logging(system, logSource.getName)),
"unix-domain-socket-io")
diff --git
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
index 67138e08f..b553e3e11 100644
---
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
+++
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/javadsl/UnixDomainSocket.scala
@@ -107,7 +107,7 @@ object UnixDomainSocket extends
ExtensionId[UnixDomainSocket] with ExtensionIdPr
*/
override def get(system: ClassicActorSystemProvider): UnixDomainSocket =
super.apply(system.classicSystem)
- def lookup: ExtensionId[_ <: Extension] =
+ def lookup: ExtensionId[? <: Extension] =
UnixDomainSocket
def createExtension(system: ExtendedActorSystem): UnixDomainSocket =
diff --git
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
index 5357bdaf5..e240c84e4 100644
---
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
+++
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/scaladsl/UnixDomainSocket.scala
@@ -42,7 +42,7 @@ object UnixDomainSocket extends ExtensionId[UnixDomainSocket]
with ExtensionIdPr
override def createExtension(system: ExtendedActorSystem) =
new UnixDomainSocket(system)
- override def lookup: ExtensionId[_ <: Extension] =
+ override def lookup: ExtensionId[? <: Extension] =
UnixDomainSocket
/**
@@ -135,7 +135,7 @@ final class UnixDomainSocket(system: ExtendedActorSystem)
extends UnixDomainSock
* independently whether the client is still attempting to
write. This setting is recommended
* for servers, and therefore it is the default setting.
*/
- def bindAndHandle(handler: Flow[ByteString, ByteString, _],
+ def bindAndHandle(handler: Flow[ByteString, ByteString, ?],
path: Path,
backlog: Int = 128,
halfClose: Boolean = false): Future[ServerBinding] =
diff --git
a/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
b/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
index d5b35a39b..01efc09e5 100644
--- a/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
+++ b/unix-domain-socket/src/test/scala/docs/scaladsl/UnixDomainSocketSpec.scala
@@ -145,7 +145,7 @@ class UnixDomainSocketSpec
}
"be able to materialize outgoing connection flow more than once" in {
- def materialize(flow: Flow[ByteString, ByteString, _]): Future[Done] =
+ def materialize(flow: Flow[ByteString, ByteString, ?]): Future[Done] =
Source.single(ByteString("Hello")).via(flow).runWith(Sink.ignore)
val path = dir.resolve("sock5")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]