This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 4a6fe5d Port akka-projection PR #684: add query plugin instance
overloads to source providers (#454)
4a6fe5d is described below
commit 4a6fe5d66f48a621b0998e509397b660140c3f06
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 1 13:34:58 2026 +0100
Port akka-projection PR #684: add query plugin instance overloads to source
providers (#454)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/cac51e5c-2e6d-4f84-99ff-01a1f44ff83a
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../state/javadsl/DurableStateSourceProvider.scala | 15 +++++++-
.../scaladsl/DurableStateSourceProvider.scala | 16 +++++++-
.../javadsl/EventSourcedProvider.scala | 44 ++++++++--------------
.../scaladsl/EventSourcedProvider.scala | 22 ++++++++---
4 files changed, 60 insertions(+), 37 deletions(-)
diff --git
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
index 955a300..e7dd58a 100644
---
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
+++
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
@@ -54,7 +54,13 @@ object DurableStateSourceProvider {
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.getDurableStateStoreFor[DurableStateStoreQuery[A]](classOf[DurableStateStoreQuery[A]],
pluginId)
+ changesByTag(system, durableStateStoreQuery, tag)
+ }
+ def changesByTag[A](
+ system: ActorSystem[_],
+ durableStateStoreQuery: DurableStateStoreQuery[A],
+ tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag,
system)
}
@@ -90,11 +96,18 @@ object DurableStateSourceProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
-
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[A]],
durableStateStoreQueryPluginId)
+ changesBySlices(system, durableStateStoreQuery, entityType, minSlice,
maxSlice)
+ }
+ def changesBySlices[A](
+ system: ActorSystem[_],
+ durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType,
minSlice, maxSlice, system)
}
diff --git
a/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
b/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
index 813bf72..3770b41 100644
---
a/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
+++
b/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
@@ -45,10 +45,15 @@ object DurableStateSourceProvider {
system: ActorSystem[_],
pluginId: String,
tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
-
val durableStateStoreQuery =
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[A]](pluginId)
+ changesByTag(system, durableStateStoreQuery, tag)
+ }
+ def changesByTag[A](
+ system: ActorSystem[_],
+ durableStateStoreQuery: DurableStateStoreQuery[A],
+ tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag,
system)
}
@@ -81,11 +86,18 @@ object DurableStateSourceProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
-
val durableStateStoreQuery =
DurableStateStoreRegistry(system)
.durableStateStoreFor[DurableStateStoreBySliceQuery[A]](durableStateStoreQueryPluginId)
+ changesBySlices(system, durableStateStoreQuery, entityType, minSlice,
maxSlice)
+ }
+ def changesBySlices[A](
+ system: ActorSystem[_],
+ durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType,
minSlice, maxSlice, system)
}
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index e4a5eb6..77581e4 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -50,11 +50,9 @@ object EventSourcedProvider {
system: ActorSystem[_],
readJournalPluginId: String,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
val eventsByTagQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery],
readJournalPluginId)
-
- new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
+ eventsByTag(system, eventsByTagQuery, tag)
}
def eventsByTag[Event](
@@ -62,10 +60,15 @@ object EventSourcedProvider {
readJournalPluginId: String,
readJournalConfig: Config,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
val eventsByTagQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery],
readJournalPluginId, readJournalConfig)
+ eventsByTag(system, eventsByTagQuery, tag)
+ }
+ def eventsByTag[Event](
+ system: ActorSystem[_],
+ eventsByTagQuery: EventsByTagQuery,
+ tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
}
@@ -101,21 +104,9 @@ object EventSourcedProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-
val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery],
readJournalPluginId)
-
- if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
- throw new IllegalArgumentException(
- s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId "
+
- s"[$readJournalPluginId] must implement
[${classOf[EventTimestampQuery].getName}]")
-
- if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
- throw new IllegalArgumentException(
- s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId "
+
- s"[$readJournalPluginId] must implement
[${classOf[LoadEventQuery].getName}]")
-
- new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
}
def eventsBySlices[Event](
@@ -125,20 +116,17 @@ object EventSourcedProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-
val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery],
readJournalPluginId, readJournalConfig)
+ eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
+ }
- if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
- throw new IllegalArgumentException(
- s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId "
+
- s"[$readJournalPluginId] must implement
[${classOf[EventTimestampQuery].getName}]")
-
- if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
- throw new IllegalArgumentException(
- s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId "
+
- s"[$readJournalPluginId] must implement
[${classOf[LoadEventQuery].getName}]")
-
+ def eventsBySlices[Event](
+ system: ActorSystem[_],
+ eventsBySlicesQuery: EventsBySliceQuery,
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
}
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index 42dafa9..5a45e0f 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -42,11 +42,9 @@ object EventSourcedProvider {
system: ActorSystem[_],
readJournalPluginId: String,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
val eventsByTagQuery =
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId)
-
- new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
+ eventsByTag(system, eventsByTagQuery, tag)
}
def eventsByTag[Event](
@@ -54,10 +52,15 @@ object EventSourcedProvider {
readJournalPluginId: String,
readJournalConfig: Config,
tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
val eventsByTagQuery =
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId,
readJournalConfig)
+ eventsByTag(system, eventsByTagQuery, tag)
+ }
+ def eventsByTag[Event](
+ system: ActorSystem[_],
+ eventsByTagQuery: EventsByTagQuery,
+ tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
}
@@ -89,8 +92,7 @@ object EventSourcedProvider {
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId)
-
- new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
}
def eventsBySlices[Event](
@@ -102,7 +104,15 @@ object EventSourcedProvider {
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId,
readJournalConfig)
+ eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
+ }
+ def eventsBySlices[Event](
+ system: ActorSystem[_],
+ eventsBySlicesQuery: EventsBySliceQuery,
+ entityType: String,
+ minSlice: Int,
+ maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]