This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a6fe5d  Port akka-projection PR #684: add query plugin instance 
overloads to source providers (#454)
4a6fe5d is described below

commit 4a6fe5d66f48a621b0998e509397b660140c3f06
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 1 13:34:58 2026 +0100

    Port akka-projection PR #684: add query plugin instance overloads to source 
providers (#454)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/cac51e5c-2e6d-4f84-99ff-01a1f44ff83a
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../state/javadsl/DurableStateSourceProvider.scala | 15 +++++++-
 .../scaladsl/DurableStateSourceProvider.scala      | 16 +++++++-
 .../javadsl/EventSourcedProvider.scala             | 44 ++++++++--------------
 .../scaladsl/EventSourcedProvider.scala            | 22 ++++++++---
 4 files changed, 60 insertions(+), 37 deletions(-)

diff --git 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
index 955a300..e7dd58a 100644
--- 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
+++ 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/javadsl/DurableStateSourceProvider.scala
@@ -54,7 +54,13 @@ object DurableStateSourceProvider {
     val durableStateStoreQuery =
       DurableStateStoreRegistry(system)
         
.getDurableStateStoreFor[DurableStateStoreQuery[A]](classOf[DurableStateStoreQuery[A]],
 pluginId)
+    changesByTag(system, durableStateStoreQuery, tag)
+  }
 
+  def changesByTag[A](
+      system: ActorSystem[_],
+      durableStateStoreQuery: DurableStateStoreQuery[A],
+      tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
     new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, 
system)
   }
 
@@ -90,11 +96,18 @@ object DurableStateSourceProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
-
     val durableStateStoreQuery =
       DurableStateStoreRegistry(system)
         .getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[A]], 
durableStateStoreQueryPluginId)
+    changesBySlices(system, durableStateStoreQuery, entityType, minSlice, 
maxSlice)
+  }
 
+  def changesBySlices[A](
+      system: ActorSystem[_],
+      durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
+      entityType: String,
+      minSlice: Int,
+      maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
     new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, 
minSlice, maxSlice, system)
   }
 
diff --git 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
index 813bf72..3770b41 100644
--- 
a/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
+++ 
b/durable-state/src/main/scala/org/apache/pekko/projection/state/scaladsl/DurableStateSourceProvider.scala
@@ -45,10 +45,15 @@ object DurableStateSourceProvider {
       system: ActorSystem[_],
       pluginId: String,
       tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
-
     val durableStateStoreQuery =
       
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[A]](pluginId)
+    changesByTag(system, durableStateStoreQuery, tag)
+  }
 
+  def changesByTag[A](
+      system: ActorSystem[_],
+      durableStateStoreQuery: DurableStateStoreQuery[A],
+      tag: String): SourceProvider[Offset, DurableStateChange[A]] = {
     new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, 
system)
   }
 
@@ -81,11 +86,18 @@ object DurableStateSourceProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
-
     val durableStateStoreQuery =
       DurableStateStoreRegistry(system)
         
.durableStateStoreFor[DurableStateStoreBySliceQuery[A]](durableStateStoreQueryPluginId)
+    changesBySlices(system, durableStateStoreQuery, entityType, minSlice, 
maxSlice)
+  }
 
+  def changesBySlices[A](
+      system: ActorSystem[_],
+      durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
+      entityType: String,
+      minSlice: Int,
+      maxSlice: Int): SourceProvider[Offset, DurableStateChange[A]] = {
     new DurableStateBySlicesSourceProvider(durableStateStoreQuery, entityType, 
minSlice, maxSlice, system)
   }
 
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index e4a5eb6..77581e4 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -50,11 +50,9 @@ object EventSourcedProvider {
       system: ActorSystem[_],
       readJournalPluginId: String,
       tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
     val eventsByTagQuery =
       PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery], 
readJournalPluginId)
-
-    new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
+    eventsByTag(system, eventsByTagQuery, tag)
   }
 
   def eventsByTag[Event](
@@ -62,10 +60,15 @@ object EventSourcedProvider {
       readJournalPluginId: String,
       readJournalConfig: Config,
       tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
     val eventsByTagQuery =
       PersistenceQuery(system).getReadJournalFor(classOf[EventsByTagQuery], 
readJournalPluginId, readJournalConfig)
+    eventsByTag(system, eventsByTagQuery, tag)
+  }
 
+  def eventsByTag[Event](
+      system: ActorSystem[_],
+      eventsByTagQuery: EventsByTagQuery,
+      tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
     new EventsByTagSourceProvider(system, eventsByTagQuery, tag)
   }
 
@@ -101,21 +104,9 @@ object EventSourcedProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-
     val eventsBySlicesQuery =
       PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], 
readJournalPluginId)
-
-    if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
-      throw new IllegalArgumentException(
-        s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " 
+
-        s"[$readJournalPluginId] must implement 
[${classOf[EventTimestampQuery].getName}]")
-
-    if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
-      throw new IllegalArgumentException(
-        s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " 
+
-        s"[$readJournalPluginId] must implement 
[${classOf[LoadEventQuery].getName}]")
-
-    new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
   }
 
   def eventsBySlices[Event](
@@ -125,20 +116,17 @@ object EventSourcedProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-
     val eventsBySlicesQuery =
       PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], 
readJournalPluginId, readJournalConfig)
+    eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
+  }
 
-    if (!eventsBySlicesQuery.isInstanceOf[EventTimestampQuery])
-      throw new IllegalArgumentException(
-        s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " 
+
-        s"[$readJournalPluginId] must implement 
[${classOf[EventTimestampQuery].getName}]")
-
-    if (!eventsBySlicesQuery.isInstanceOf[LoadEventQuery])
-      throw new IllegalArgumentException(
-        s"[${eventsBySlicesQuery.getClass.getName}] with readJournalPluginId " 
+
-        s"[$readJournalPluginId] must implement 
[${classOf[LoadEventQuery].getName}]")
-
+  def eventsBySlices[Event](
+      system: ActorSystem[_],
+      eventsBySlicesQuery: EventsBySliceQuery,
+      entityType: String,
+      minSlice: Int,
+      maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
     new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
   }
 
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index 42dafa9..5a45e0f 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -42,11 +42,9 @@ object EventSourcedProvider {
       system: ActorSystem[_],
       readJournalPluginId: String,
       tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
     val eventsByTagQuery =
       
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId)
-
-    new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
+    eventsByTag(system, eventsByTagQuery, tag)
   }
 
   def eventsByTag[Event](
@@ -54,10 +52,15 @@ object EventSourcedProvider {
       readJournalPluginId: String,
       readJournalConfig: Config,
       tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
-
     val eventsByTagQuery =
       
PersistenceQuery(system).readJournalFor[EventsByTagQuery](readJournalPluginId, 
readJournalConfig)
+    eventsByTag(system, eventsByTagQuery, tag)
+  }
 
+  def eventsByTag[Event](
+      system: ActorSystem[_],
+      eventsByTagQuery: EventsByTagQuery,
+      tag: String): SourceProvider[Offset, EventEnvelope[Event]] = {
     new EventsByTagSourceProvider(eventsByTagQuery, tag, system)
   }
 
@@ -89,8 +92,7 @@ object EventSourcedProvider {
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
     val eventsBySlicesQuery =
       
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId)
-
-    new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
   }
 
   def eventsBySlices[Event](
@@ -102,7 +104,15 @@ object EventSourcedProvider {
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
     val eventsBySlicesQuery =
       
PersistenceQuery(system).readJournalFor[EventsBySliceQuery](readJournalPluginId,
 readJournalConfig)
+    eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
+  }
 
+  def eventsBySlices[Event](
+      system: ActorSystem[_],
+      eventsBySlicesQuery: EventsBySliceQuery,
+      entityType: String,
+      minSlice: Int,
+      maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
     new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to