This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 2181d02 test with pekko2 (#449)
2181d02 is described below
commit 2181d02c740adb8b2fd7a07437dbdfbc70d37eb1
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Apr 25 11:36:33 2026 +0200
test with pekko2 (#449)
* test with pekko2
* more changes
* watchTermination
* Update OffsetSerializationSpec.scala
* Remove Apache Maven Staging Repo from resolvers
---
build.sbt | 1 +
.../scala/org/apache/pekko/projection/ProjectionBehaviorSpec.scala | 4 ++--
.../apache/pekko/projection/internal/OffsetSerializationSpec.scala | 3 +--
.../apache/pekko/projection/internal/InternalProjectionState.scala | 4 ++--
.../pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala | 2 +-
project/Dependencies.scala | 2 +-
project/PekkoConnectorsDependency.scala | 2 +-
project/PekkoConnectorsKafkaDependency.scala | 2 +-
project/PekkoCoreDependency.scala | 2 +-
9 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/build.sbt b/build.sbt
index 6f3f90c..0f8c084 100644
--- a/build.sbt
+++ b/build.sbt
@@ -14,6 +14,7 @@ ThisBuild / versionScheme := Some(VersionScheme.SemVerSpec)
sourceDistName := "apache-pekko-projection"
sourceDistIncubating := false
+ThisBuild / evictionErrorLevel := Level.Info
ThisBuild / resolvers += Resolver.ApacheMavenSnapshotsRepo
ThisBuild / reproducibleBuildsCheckResolver := Resolver.ApacheMavenStagingRepo
diff --git
a/core-test/src/test/scala/org/apache/pekko/projection/ProjectionBehaviorSpec.scala
b/core-test/src/test/scala/org/apache/pekko/projection/ProjectionBehaviorSpec.scala
index 20a3bff..fc889d7 100644
---
a/core-test/src/test/scala/org/apache/pekko/projection/ProjectionBehaviorSpec.scala
+++
b/core-test/src/test/scala/org/apache/pekko/projection/ProjectionBehaviorSpec.scala
@@ -259,7 +259,7 @@ class ProjectionBehaviorSpec extends
ScalaTestWithActorTestKit("""
val testProbe = testKit.createTestProbe[ProbeMessage]()
val streamDoneProbe = testKit.createTestProbe[Done]()
- val src = Source(1 to
2).concat(Source.maybe).watchTermination()(Keep.right).mapMaterializedValue {
done =>
+ val src = Source(1 to
2).concat(Source.maybe).watchTermination(Keep.right).mapMaterializedValue {
done =>
done.onComplete(_ => streamDoneProbe.ref !
Done)(system.executionContext)
NotUsed
}
@@ -281,7 +281,7 @@ class ProjectionBehaviorSpec extends
ScalaTestWithActorTestKit("""
"stop after stopping actor without stop message" in {
val testProbe = testKit.createTestProbe[ProbeMessage]()
val streamDoneProbe = testKit.createTestProbe[Done]()
- val src = Source(1 to
2).concat(Source.maybe).watchTermination()(Keep.right).mapMaterializedValue {
done =>
+ val src = Source(1 to
2).concat(Source.maybe).watchTermination(Keep.right).mapMaterializedValue {
done =>
done.onComplete(_ => streamDoneProbe.ref !
Done)(system.executionContext)
NotUsed
}
diff --git
a/core-test/src/test/scala/org/apache/pekko/projection/internal/OffsetSerializationSpec.scala
b/core-test/src/test/scala/org/apache/pekko/projection/internal/OffsetSerializationSpec.scala
index 3368e8e..74c490a 100644
---
a/core-test/src/test/scala/org/apache/pekko/projection/internal/OffsetSerializationSpec.scala
+++
b/core-test/src/test/scala/org/apache/pekko/projection/internal/OffsetSerializationSpec.scala
@@ -27,11 +27,10 @@ import pekko.persistence.query
import pekko.projection.MergeableOffset
import pekko.projection.ProjectionId
import pekko.serialization.SerializerWithStringManifest
-import pekko.util.unused
import org.scalatest.wordspec.AnyWordSpecLike
object OffsetSerializationSpec {
- class TestSerializer(@unused system: ExtendedActorSystem) extends
SerializerWithStringManifest {
+ class TestSerializer(system: ExtendedActorSystem) extends
SerializerWithStringManifest {
def identifier: Int = 9999
def manifest(o: AnyRef): String =
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
index bedc44d..904b64b 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
@@ -200,7 +200,7 @@ private[projection] abstract class
InternalProjectionState[Offset, Envelope](
case f: FlowHandlerStrategy[Envelope @unchecked] =>
val flow =
- f.flowCtx.asFlow.watchTermination() {
+ f.flowCtx.asFlow.watchTermination {
case (_, futDone) =>
futDone.recoverWith {
case t =>
@@ -470,7 +470,7 @@ private[projection] abstract class
InternalProjectionState[Offset, Envelope](
src: Source[Done, NotUsed],
handlerLifecycle: HandlerLifecycle): Source[Done, Future[Done]] = {
src
- .watchTermination() { (_, futDone) =>
+ .watchTermination { (_, futDone) =>
handlerStrategy.recreateHandlerOnNextAccess()
futDone
.andThen { case _ => handlerLifecycle.tryStop() }
diff --git
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
index a8df929..506c661 100644
---
a/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
+++
b/kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala
@@ -105,7 +105,7 @@ import org.apache.kafka.common.record.TimestampType
control = Some(m)
m
}
- .watchTermination()(Keep.right)
+ .watchTermination(Keep.right)
.mapMaterializedValue { terminated =>
terminated.onComplete(_ => metadataClient.stop())
NotUsed
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index bc8beec..4868bea 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -24,7 +24,7 @@ object Dependencies {
object Versions {
val pekko = PekkoCoreDependency.version
- val pekkoPersistenceJdbc = "1.2.0"
+ val pekkoPersistenceJdbc = "2.0.0-M1"
val pekkoPersistenceCassandra = "1.1.0"
val connectors = PekkoConnectorsDependency.version
val connectorsKafka = PekkoConnectorsKafkaDependency.version
diff --git a/project/PekkoConnectorsDependency.scala
b/project/PekkoConnectorsDependency.scala
index 448a962..bd64549 100644
--- a/project/PekkoConnectorsDependency.scala
+++ b/project/PekkoConnectorsDependency.scala
@@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoConnectorsDependency extends PekkoDependency {
override val checkProject: String = "pekko-connectors-cassandra"
override val module: Option[String] = Some("connectors")
- override val currentVersion: String = "1.2.0"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git a/project/PekkoConnectorsKafkaDependency.scala
b/project/PekkoConnectorsKafkaDependency.scala
index b147cbe..0947511 100644
--- a/project/PekkoConnectorsKafkaDependency.scala
+++ b/project/PekkoConnectorsKafkaDependency.scala
@@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoConnectorsKafkaDependency extends PekkoDependency {
override val checkProject: String = "pekko-connectors-kafka-testkit"
override val module: Option[String] = Some("connectors.kafka")
- override val currentVersion: String = "1.1.0"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index d5d0263..f6a3fdf 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.3.0"
+ override val currentVersion: String = "2.0.0-M1"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]